beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/4] beam git commit: Move Common Fn Execution Concepts to fn-execution
Date Thu, 23 Nov 2017 00:17:22 GMT
Repository: beam
Updated Branches:
  refs/heads/master 5c74022da -> 4847d34d7


http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutors.java
new file mode 100644
index 0000000..68f1583
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutors.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
+ * allows for testing that tasks have exercised the appropriate shutdown logic.
+ */
+public class TestExecutors {
+  public static TestExecutorService from(final ExecutorService staticExecutorService) {
+    return from(new Supplier<ExecutorService>() {
+      @Override
+      public ExecutorService get() {
+        return staticExecutorService;
+      }
+    });
+  }
+
+  public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
+    return new FromSupplier(executorServiceSuppler);
+  }
+
+  /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
+  public interface TestExecutorService extends ExecutorService, TestRule {}
+
+  private static class FromSupplier extends ForwardingExecutorService
+      implements TestExecutorService {
+    private final Supplier<ExecutorService> executorServiceSupplier;
+    private ExecutorService delegate;
+
+    private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
+      this.executorServiceSupplier = executorServiceSupplier;
+    }
+
+    @Override
+    public Statement apply(final Statement statement, Description arg1) {
+      return new Statement() {
+        @Override
+        public void evaluate() throws Throwable {
+          Throwable thrown = null;
+          delegate = executorServiceSupplier.get();
+          try {
+            statement.evaluate();
+          } catch (Throwable t) {
+            thrown = t;
+          }
+          shutdown();
+          if (!awaitTermination(5, TimeUnit.SECONDS)) {
+            shutdownNow();
+            IllegalStateException e =
+                new IllegalStateException("Test executor failed to shutdown cleanly.");
+            if (thrown != null) {
+              thrown.addSuppressed(e);
+            } else {
+              thrown = e;
+            }
+          }
+          if (thrown != null) {
+            throw thrown;
+          }
+        }
+      };
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+      return delegate;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutorsTest.java
new file mode 100644
index 0000000..50c75b8
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestExecutorsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/** Tests for {@link TestExecutors}. */
+@RunWith(JUnit4.class)
+public class TestExecutorsTest {
+  @Test
+  public void testSuccessfulTermination() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(service);
+    final AtomicBoolean taskRan = new AtomicBoolean();
+    testService
+        .apply(
+            new Statement() {
+              @Override
+              public void evaluate() throws Throwable {
+                testService.submit(new Runnable() {
+                  @Override
+                  public void run() {
+                    taskRan.set(true);
+                  }
+                });
+              }
+            },
+            null)
+        .evaluate();
+    assertTrue(service.isTerminated());
+    assertTrue(taskRan.get());
+  }
+
+  @Test
+  public void testTaskBlocksForeverCausesFailure() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(service);
+    final AtomicBoolean taskStarted = new AtomicBoolean();
+    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  testService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                      taskToRun();
+                    }
+                  });
+                }
+
+                private void taskToRun() {
+                  taskStarted.set(true);
+                  try {
+                    while (true) {
+                      Thread.sleep(10000);
+                    }
+                  } catch (InterruptedException e) {
+                    taskWasInterrupted.set(true);
+                    return;
+                  }
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(IllegalStateException.class, e.getClass());
+      assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
+    }
+    assertTrue(service.isShutdown());
+  }
+
+  @Test
+  public void testStatementFailurePropagatedCleanly() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(service);
+    final RuntimeException exceptionToThrow = new RuntimeException();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  throw exceptionToThrow;
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (RuntimeException thrownException) {
+      assertSame(exceptionToThrow, thrownException);
+    }
+    assertTrue(service.isShutdown());
+  }
+
+  @Test
+  public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
+      throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(service);
+    final AtomicBoolean taskStarted = new AtomicBoolean();
+    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+    final RuntimeException exceptionToThrow = new RuntimeException();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  testService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                      taskToRun();
+                    }
+                  });
+                  throw exceptionToThrow;
+                }
+
+                private void taskToRun() {
+                  taskStarted.set(true);
+                  try {
+                    while (true) {
+                      Thread.sleep(10000);
+                    }
+                  } catch (InterruptedException e) {
+                    taskWasInterrupted.set(true);
+                    return;
+                  }
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (RuntimeException thrownException) {
+      assertSame(exceptionToThrow, thrownException);
+      assertEquals(1, exceptionToThrow.getSuppressed().length);
+      assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
+      assertEquals(
+          "Test executor failed to shutdown cleanly.",
+          exceptionToThrow.getSuppressed()[0].getMessage());
+    }
+    assertTrue(service.isShutdown());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
new file mode 100644
index 0000000..5df505b
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+
+/** Utility methods which enable testing of {@link StreamObserver}s. */
+public class TestStreams {
+  /**
+   * Creates a test {@link CallStreamObserver}  {@link Builder} that forwards
+   * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
+   */
+  public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
+    return new Builder<>(new ForwardingCallStreamObserver<>(
+        onNext,
+        TestStreams.<Throwable>noopConsumer(),
+        TestStreams.noopRunnable(),
+        TestStreams.alwaysTrueSupplier()));
+  }
+
+  /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
+  public static class Builder<T> {
+    private final ForwardingCallStreamObserver<T> observer;
+    private Builder(ForwardingCallStreamObserver<T> observer) {
+      this.observer = observer;
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link CallStreamObserver#isReady} callback.
+     */
+    public Builder<T> withIsReady(Supplier<Boolean> isReady) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          observer.onError,
+          observer.onCompleted,
+          isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onCompleted} callback.
+     */
+    public Builder<T> withOnCompleted(Runnable onCompleted) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          observer.onError,
+          onCompleted,
+          observer.isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onError} callback.
+     */
+    public Builder<T> withOnError(final Runnable onError) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          new Consumer<Throwable>() {
+            @Override
+            public void accept(Throwable t) {
+              onError.run();
+            }
+          },
+          observer.onCompleted,
+          observer.isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onError} consumer.
+     */
+    public Builder<T> withOnError(Consumer<Throwable> onError) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext, onError, observer.onCompleted, observer.isReady));
+    }
+
+    public CallStreamObserver<T> build() {
+      return observer;
+    }
+  }
+
+  private static void noop() {
+  }
+
+  private static Runnable noopRunnable() {
+    return new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+  }
+
+  private static void noop(Throwable t) {
+  }
+
+  private static <T> Consumer<T> noopConsumer() {
+    return new Consumer<T>() {
+      @Override
+      public void accept(T item) {
+      }
+    };
+  }
+
+  private static boolean returnTrue() {
+    return true;
+  }
+
+  private static Supplier<Boolean> alwaysTrueSupplier() {
+    return new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return true;
+      }
+    };
+  }
+
+  /** A {@link CallStreamObserver} which executes the supplied callbacks. */
+  private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
+    private final Consumer<T> onNext;
+    private final Supplier<Boolean> isReady;
+    private final Consumer<Throwable> onError;
+    private final Runnable onCompleted;
+
+    public ForwardingCallStreamObserver(
+        Consumer<T> onNext,
+        Consumer<Throwable> onError,
+        Runnable onCompleted,
+        Supplier<Boolean> isReady) {
+      this.onNext = onNext;
+      this.onError = onError;
+      this.onCompleted = onCompleted;
+      this.isReady = isReady;
+    }
+
+    @Override
+    public void onNext(T value) {
+      onNext.accept(value);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      onError.accept(t);
+    }
+
+    @Override
+    public void onCompleted() {
+      onCompleted.run();
+    }
+
+    @Override
+    public boolean isReady() {
+      return isReady.get();
+    }
+
+    @Override
+    public void setOnReadyHandler(Runnable onReadyHandler) {}
+
+    @Override
+    public void disableAutoInboundFlowControl() {}
+
+    @Override
+    public void request(int count) {}
+
+    @Override
+    public void setMessageCompression(boolean enable) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreamsTest.java
new file mode 100644
index 0000000..e386e55
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreamsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreams}. */
+@RunWith(JUnit4.class)
+public class TestStreamsTest {
+  @Test
+  public void testOnNextIsCalled() {
+    final AtomicBoolean onNextWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(new Consumer<Boolean>() {
+      @Override
+      public void accept(Boolean item) {
+        onNextWasCalled.set(item);
+      }
+    }).build().onNext(true);
+    assertTrue(onNextWasCalled.get());
+  }
+
+  @Test
+  public void testIsReadyIsCalled() {
+    final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
+    assertFalse(TestStreams.withOnNext(null)
+        .withIsReady(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            return isReadyWasCalled.getAndSet(true);
+          }
+        })
+        .build()
+        .isReady());
+    assertTrue(isReadyWasCalled.get());
+  }
+
+  @Test
+  public void testOnCompletedIsCalled() {
+    final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(null)
+        .withOnCompleted(new Runnable() {
+          @Override
+          public void run() {
+            onCompletedWasCalled.set(true);
+          }
+        })
+        .build()
+        .onCompleted();
+    assertTrue(onCompletedWasCalled.get());
+  }
+
+  @Test
+  public void testOnErrorRunnableIsCalled() {
+    RuntimeException throwable = new RuntimeException();
+    final AtomicBoolean onErrorWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(null)
+        .withOnError(new Runnable() {
+          @Override
+          public void run() {
+            onErrorWasCalled.set(true);
+          }
+        })
+        .build()
+        .onError(throwable);
+    assertTrue(onErrorWasCalled.get());
+  }
+
+  @Test
+  public void testOnErrorConsumerIsCalled() {
+    RuntimeException throwable = new RuntimeException();
+    final Collection<Throwable> onErrorWasCalled = new ArrayList<>();
+    TestStreams.withOnNext(null)
+        .withOnError(new Consumer<Throwable>() {
+          @Override
+          public void accept(Throwable item) {
+            onErrorWasCalled.add(item);
+          }
+        })
+        .build()
+        .onError(throwable);
+    assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index b644266..1df2623 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -30,12 +30,12 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.StreamObserverFactory;
-import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
index 7064db4..f2e852c 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -26,11 +26,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
-import org.apache.beam.fn.harness.stream.DataStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.Builder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.stream.DataStreams;
 
 /**
  * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
deleted file mode 100644
index 2007139..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import java.util.concurrent.Phaser;
-
-/**
- * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates
- * after the first advancement.
- */
-public final class AdvancingPhaser extends Phaser {
-  public AdvancingPhaser(int numParties) {
-    super(numParties);
-  }
-
-  @Override
-  protected boolean onAdvance(int phase, int registeredParties) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
deleted file mode 100644
index cd96440..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing
- * thread responsible for interacting with the underlying {@link CallStreamObserver}.
- *
- * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
- * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers
- * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
- * becomes ready.
- */
-@ThreadSafe
-public final class BufferingStreamObserver<T> implements StreamObserver<T> {
-  private static final Object POISON_PILL = new Object();
-  private final LinkedBlockingDeque<T> queue;
-  private final Phaser phaser;
-  private final CallStreamObserver<T> outboundObserver;
-  private final Future<?> queueDrainer;
-  private final int bufferSize;
-
-  public BufferingStreamObserver(
-      Phaser phaser,
-      CallStreamObserver<T> outboundObserver,
-      ExecutorService executor,
-      int bufferSize) {
-    this.phaser = phaser;
-    this.bufferSize = bufferSize;
-    this.queue = new LinkedBlockingDeque<>(bufferSize);
-    this.outboundObserver = outboundObserver;
-    this.queueDrainer = executor.submit(this::drainQueue);
-  }
-
-  private void drainQueue() {
-    try {
-      while (true) {
-        int currentPhase = phaser.getPhase();
-        while (outboundObserver.isReady()) {
-          T value = queue.take();
-          if (value != POISON_PILL) {
-            outboundObserver.onNext(value);
-          } else {
-            return;
-          }
-        }
-        phaser.awaitAdvance(currentPhase);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(e);
-    }
-  }
-
-  @Override
-  public void onNext(T value) {
-    try {
-      // Attempt to add an element to the bounded queue occasionally checking to see
-      // if the queue drainer is still alive.
-      while (!queue.offer(value, 60, TimeUnit.SECONDS)) {
-        checkState(!queueDrainer.isDone(), "Stream observer has finished.");
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    synchronized (outboundObserver) {
-      // If we are done, then a previous caller has already shutdown the queue processing thread
-      // hence we don't need to do it again.
-      if (!queueDrainer.isDone()) {
-        // We check to see if we were able to successfully insert the poison pill at the front of
-        // the queue to cancel the processing thread eagerly or if the processing thread is done.
-        try {
-          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
-          // since the queue may be full and nothing will be emptying it.
-          while (!queueDrainer.isDone()
-              && !queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)) {
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        }
-        waitTillFinish();
-      }
-      outboundObserver.onError(t);
-    }
-  }
-
-  @Override
-  public void onCompleted() {
-    synchronized (outboundObserver) {
-      // If we are done, then a previous caller has already shutdown the queue processing thread
-      // hence we don't need to do it again.
-      if (!queueDrainer.isDone()) {
-        // We check to see if we were able to successfully insert the poison pill at the end of
-        // the queue forcing the remainder of the elements to be processed or if the processing
-        // thread is done.
-        try {
-          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
-          // since the queue may be full and nothing will be emptying it.
-          while (!queueDrainer.isDone()
-              && !queue.offerLast((T) POISON_PILL, 60, TimeUnit.SECONDS)) {
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        }
-        waitTillFinish();
-      }
-      outboundObserver.onCompleted();
-    }
-  }
-
-  @VisibleForTesting
-  public int getBufferSize() {
-    return bufferSize;
-  }
-
-  private void waitTillFinish() {
-    try {
-      queueDrainer.get();
-    } catch (CancellationException e) {
-      // Cancellation is expected
-      return;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
deleted file mode 100644
index 3ecd303..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness.stream;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingInputStream;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PushbackInputStream;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.sdk.coders.Coder;
-
-/**
- * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
- * {@link #outbound(CloseableThrowingConsumer)} treats a single {@link OutputStream} as multiple
- * {@link ByteString}s.
- */
-public class DataStreams {
-  /**
-   * Converts multiple {@link ByteString}s into a single {@link InputStream}.
-   *
-   * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
-   * either it knows that no more values will be provided or it has the next {@link ByteString}.
-   */
-  public static InputStream inbound(Iterator<ByteString> bytes) {
-    return new Inbound(bytes);
-  }
-
-  /**
-   * Converts a single {@link OutputStream} into multiple {@link ByteString}s.
-   */
-  public static OutputStream outbound(CloseableThrowingConsumer<ByteString> consumer) {
-    // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the
-   * first {@link Iterator} on first access of this input stream.
-   *
-   * <p>Closing this input stream has no effect.
-   */
-  private static class Inbound<T> extends InputStream {
-    private static final InputStream EMPTY_STREAM = new InputStream() {
-      @Override
-      public int read() throws IOException {
-        return -1;
-      }
-    };
-
-    private final Iterator<ByteString> bytes;
-    private InputStream currentStream;
-
-    public Inbound(Iterator<ByteString> bytes) {
-      this.currentStream = EMPTY_STREAM;
-      this.bytes = bytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-      int rval = -1;
-      // Move on to the next stream if we have read nothing
-      while ((rval = currentStream.read()) == -1 && bytes.hasNext()) {
-        currentStream = bytes.next().newInput();
-      }
-      return rval;
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-      int remainingLen = len;
-      while ((remainingLen -= ByteStreams.read(
-          currentStream, b, off + len - remainingLen, remainingLen)) > 0) {
-        if (bytes.hasNext()) {
-          currentStream = bytes.next().newInput();
-        } else {
-          int bytesRead = len - remainingLen;
-          return bytesRead > 0 ? bytesRead : -1;
-        }
-      }
-      return len - remainingLen;
-    }
-  }
-
-  /**
-   * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values
-   * using the specified {@link Coder}.
-   *
-   * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode
-   * consuming zero bytes to consuming exactly one byte.
-   *
-   * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on
-   * first access to {@link #next()} or {@link #hasNext()}.
-   */
-  public static class DataStreamDecoder<T> implements Iterator<T> {
-    private enum State { READ_REQUIRED, HAS_NEXT, EOF };
-
-    private final CountingInputStream countingInputStream;
-    private final PushbackInputStream pushbackInputStream;
-    private final Coder<T> coder;
-    private State currentState;
-    private T next;
-    public DataStreamDecoder(Coder<T> coder, InputStream inputStream) {
-      this.currentState = State.READ_REQUIRED;
-      this.coder = coder;
-      this.pushbackInputStream = new PushbackInputStream(inputStream, 1);
-      this.countingInputStream = new CountingInputStream(pushbackInputStream);
-    }
-
-    @Override
-    public boolean hasNext() {
-      switch (currentState) {
-        case EOF:
-          return false;
-        case READ_REQUIRED:
-          try {
-            int nextByte = pushbackInputStream.read();
-            if (nextByte == -1) {
-              currentState = State.EOF;
-              return false;
-            }
-
-            pushbackInputStream.unread(nextByte);
-            long count = countingInputStream.getCount();
-            next = coder.decode(countingInputStream);
-            // Skip one byte if decoding the value consumed 0 bytes.
-            if (countingInputStream.getCount() - count == 0) {
-              checkState(countingInputStream.read() != -1, "Unexpected EOF reached");
-            }
-            currentState = State.HAS_NEXT;
-          } catch (IOException e) {
-            throw new IllegalStateException(e);
-          }
-          return true;
-        case HAS_NEXT:
-          return true;
-      }
-      throw new IllegalStateException(String.format("Unknown state %s", currentState));
-    }
-
-    @Override
-    public T next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      currentState = State.READ_REQUIRED;
-      return next;
-    }
-  }
-
-  /**
-   * Allows for one or more writing threads to append values to this iterator while one reading
-   * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
-   * available or this has been closed.
-   *
-   * <p>External synchronization must be provided if multiple readers would like to access the
-   * {@link Iterator#hasNext()} and {@link Iterator#next()} methods.
-   *
-   * <p>The order or values which are appended to this iterator is nondeterministic when multiple
-   * threads call {@link #accept(Object)}.
-   */
-  public static class BlockingQueueIterator<T> implements
-      CloseableThrowingConsumer<T>, Iterator<T> {
-    private static final Object POISION_PILL = new Object();
-    private final BlockingQueue<T> queue;
-
-    /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */
-    private T currentElement;
-
-    public BlockingQueueIterator(BlockingQueue<T> queue) {
-      this.queue = queue;
-    }
-
-    @Override
-    public void close() throws Exception {
-      queue.put((T) POISION_PILL);
-    }
-
-    @Override
-    public void accept(T t) throws Exception {
-      queue.put(t);
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (currentElement == null) {
-        try {
-          currentElement = queue.take();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IllegalStateException(e);
-        }
-      }
-      return currentElement != POISION_PILL;
-    }
-
-    @Override
-    public T next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      T rval = currentElement;
-      currentElement = null;
-      return rval;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
deleted file mode 100644
index 82a1aa4..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.Phaser;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A {@link StreamObserver} which uses synchronization on the underlying
- * {@link CallStreamObserver} to provide thread safety.
- *
- * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
- * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready.
- * Creator is expected to advance the {@link Phaser} whenever the underlying
- * {@link CallStreamObserver} becomes ready.
- */
-@ThreadSafe
-public final class DirectStreamObserver<T> implements StreamObserver<T> {
-  private final Phaser phaser;
-  private final CallStreamObserver<T> outboundObserver;
-
-  public DirectStreamObserver(
-      Phaser phaser,
-      CallStreamObserver<T> outboundObserver) {
-    this.phaser = phaser;
-    this.outboundObserver = outboundObserver;
-  }
-
-  @Override
-  public void onNext(T value) {
-    int phase = phaser.getPhase();
-    if (!outboundObserver.isReady()) {
-      phaser.awaitAdvance(phase);
-    }
-    synchronized (outboundObserver) {
-      outboundObserver.onNext(value);
-    }
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    synchronized (outboundObserver) {
-      outboundObserver.onError(t);
-    }
-  }
-
-  @Override
-  public void onCompleted() {
-    synchronized (outboundObserver) {
-      outboundObserver.onCompleted();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
deleted file mode 100644
index ef641b0..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ClientResponseObserver;
-import io.grpc.stub.StreamObserver;
-
-/**
- * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls.
- *
- * <p>Used to wrap existing {@link StreamObserver}s to be able to install an
- * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
- *
- * <p>This is as thread-safe as the undering stream observer that is being wrapped.
- */
-final class ForwardingClientResponseObserver<ReqT, RespT>
-    implements ClientResponseObserver<RespT, ReqT> {
-  private final Runnable onReadyHandler;
-  private final StreamObserver<ReqT> inboundObserver;
-
-  ForwardingClientResponseObserver(
-      StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) {
-    this.inboundObserver = inboundObserver;
-    this.onReadyHandler = onReadyHandler;
-  }
-
-  @Override
-  public void onNext(ReqT value) {
-    inboundObserver.onNext(value);
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    inboundObserver.onError(t);
-  }
-
-  @Override
-  public void onCompleted() {
-    inboundObserver.onCompleted();
-  }
-
-  @Override
-  public void beforeStart(ClientCallStreamObserver<RespT> stream) {
-    stream.setOnReadyHandler(onReadyHandler);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
index 99e33c2..1d9c29b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
@@ -24,6 +24,10 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
+import org.apache.beam.sdk.fn.stream.BufferingStreamObserver;
+import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
+import org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
@@ -58,9 +62,11 @@ public abstract class StreamObserverFactory {
         Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
         StreamObserver<ReqT> inboundObserver) {
       AdvancingPhaser phaser = new AdvancingPhaser(1);
-      CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
-          new ForwardingClientResponseObserver<ReqT, RespT>(
-              inboundObserver, phaser::arrive));
+      CallStreamObserver<RespT> outboundObserver =
+          (CallStreamObserver<RespT>)
+              clientFactory.apply(
+                  ForwardingClientResponseObserver.<ReqT, RespT>create(
+                      inboundObserver, phaser::arrive));
       return new DirectStreamObserver<>(phaser, outboundObserver);
     }
   }
@@ -80,9 +86,11 @@ public abstract class StreamObserverFactory {
         Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
         StreamObserver<ReqT> inboundObserver) {
       AdvancingPhaser phaser = new AdvancingPhaser(1);
-      CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
-          new ForwardingClientResponseObserver<ReqT, RespT>(
-              inboundObserver, phaser::arrive));
+      CallStreamObserver<RespT> outboundObserver =
+          (CallStreamObserver<RespT>)
+              clientFactory.apply(
+                  ForwardingClientResponseObserver.<ReqT, RespT>create(
+                      inboundObserver, phaser::arrive));
       return new BufferingStreamObserver<>(
           phaser, outboundObserver, executorService, bufferSize);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index f00346d..b7d0bb0 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -49,8 +49,6 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.harness.test.TestExecutors;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -58,6 +56,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index c926414..f984601 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -28,8 +28,6 @@ import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
@@ -38,6 +36,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index 56ae7ed..26b5026 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -39,10 +39,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.ThrowingFunction;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index aa1a504..a23cb2a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -23,11 +23,11 @@ import static org.junit.Assert.assertEquals;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import org.apache.beam.harness.test.TestExecutors;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterResponse;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 81b1aa4..74dec1e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -29,11 +29,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 9e21398..55c0b43 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -44,8 +44,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
@@ -53,6 +51,8 @@ import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
index 6a12ed0..94e561d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
@@ -30,9 +30,9 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.values.KV;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 1e68b18..4d28b80 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -41,10 +41,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
index 12c9c43..86a24b2 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
@@ -40,11 +40,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.IdGenerator;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.After;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
deleted file mode 100644
index 3dd1b42..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/AdvancingPhaserTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link AdvancingPhaser}. */
-@RunWith(JUnit4.class)
-public class AdvancingPhaserTest {
-  @Test
-  public void testAdvancement() throws Exception {
-    AdvancingPhaser phaser = new AdvancingPhaser(1);
-    int currentPhase = phaser.getPhase();
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    service.submit(phaser::arrive);
-    phaser.awaitAdvance(currentPhase);
-    assertFalse(phaser.isTerminated());
-    service.shutdown();
-    if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
-      assertThat(service.shutdownNow(), empty());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
deleted file mode 100644
index 96648e9..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestExecutors;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
-import org.apache.beam.harness.test.TestStreams;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link BufferingStreamObserver}. */
-@RunWith(JUnit4.class)
-public class BufferingStreamObserverTest {
-  @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
-
-  @Test
-  public void testThreadSafety() throws Exception {
-    List<String> onNextValues = new ArrayList<>();
-    AdvancingPhaser phaser = new AdvancingPhaser(1);
-    final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
-    final BufferingStreamObserver<String> streamObserver =
-        new BufferingStreamObserver<>(
-            phaser,
-            TestStreams.withOnNext(
-                new Consumer<String>() {
-                  @Override
-                  public void accept(String t) {
-                    // Use the atomic boolean to detect if multiple threads are in this
-                    // critical section. Any thread that enters purposefully blocks by sleeping
-                    // to increase the contention between threads artificially.
-                    assertFalse(isCriticalSectionShared.getAndSet(true));
-                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
-                    onNextValues.add(t);
-                    assertTrue(isCriticalSectionShared.getAndSet(false));
-                  }
-                }).build(),
-            executor,
-            3);
-
-    List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
-    List<Callable<String>> tasks = new ArrayList<>();
-    for (String prefix : prefixes) {
-      tasks.add(
-          new Callable<String>() {
-            @Override
-            public String call() throws Exception {
-              for (int i = 0; i < 10; i++) {
-                streamObserver.onNext(prefix + i);
-              }
-              return prefix;
-            }
-          });
-    }
-    List<Future<String>> results = executor.invokeAll(tasks);
-    for (Future<String> result : results) {
-      result.get();
-    }
-    streamObserver.onCompleted();
-
-    // Check that order was maintained.
-    int[] prefixesIndex = new int[prefixes.size()];
-    assertEquals(50, onNextValues.size());
-    for (String onNextValue : onNextValues) {
-      int prefix = Integer.parseInt(onNextValue.substring(0, 1));
-      int suffix = Integer.parseInt(onNextValue.substring(1, 2));
-      assertEquals(prefixesIndex[prefix], suffix);
-      prefixesIndex[prefix] += 1;
-    }
-  }
-
-  @Test
-  public void testIsReadyIsHonored() throws Exception {
-    AdvancingPhaser phaser = new AdvancingPhaser(1);
-    final AtomicBoolean elementsAllowed = new AtomicBoolean();
-    final BufferingStreamObserver<String> streamObserver =
-        new BufferingStreamObserver<>(
-            phaser,
-            TestStreams.withOnNext(
-                new Consumer<String>() {
-                  @Override
-                  public void accept(String t) {
-                    assertTrue(elementsAllowed.get());
-                  }
-                }).withIsReady(elementsAllowed::get).build(),
-            executor,
-            3);
-
-    // Start all the tasks
-    List<Future<String>> results = new ArrayList<>();
-    for (String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
-      results.add(
-          executor.submit(
-              new Callable<String>() {
-                @Override
-                public String call() throws Exception {
-                  for (int i = 0; i < 10; i++) {
-                    streamObserver.onNext(prefix + i);
-                  }
-                  return prefix;
-                }
-              }));
-    }
-
-    // Have them wait and then flip that we do allow elements and wake up those awaiting
-    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    elementsAllowed.set(true);
-    phaser.arrive();
-
-    for (Future<String> result : results) {
-      result.get();
-    }
-    streamObserver.onCompleted();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
deleted file mode 100644
index f7a87e1..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DataStreamsTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness.stream;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assume.assumeTrue;
-
-import com.google.common.collect.Iterators;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.SynchronousQueue;
-import org.apache.beam.fn.harness.stream.DataStreams.BlockingQueueIterator;
-import org.apache.beam.fn.harness.stream.DataStreams.DataStreamDecoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DataStreams}. */
-@RunWith(Enclosed.class)
-public class DataStreamsTest {
-
-  /** Tests for {@link DataStreams.Inbound}. */
-  @RunWith(JUnit4.class)
-  public static class InboundTest {
-    private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
-    private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
-
-    @Test
-    public void testEmptyRead() throws Exception {
-      assertEquals(ByteString.EMPTY, read());
-      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
-      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
-    }
-
-    @Test
-    public void testRead() throws Exception {
-      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
-      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
-      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
-    }
-
-    private static ByteString read(ByteString... bytes) throws IOException {
-      return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
-    }
-  }
-
-  /** Tests for {@link DataStreams.BlockingQueueIterator}. */
-  @RunWith(JUnit4.class)
-  public static class BlockingQueueIteratorTest {
-    @Test(timeout = 10_000)
-    public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
-      BlockingQueueIterator<String> iterator =
-          new BlockingQueueIterator<>(new ArrayBlockingQueue<>(3));
-
-      iterator.accept("A");
-      iterator.accept("B");
-      iterator.close();
-
-      assertEquals(Arrays.asList("A", "B"),
-          Arrays.asList(Iterators.toArray(iterator, String.class)));
-    }
-
-    @Test(timeout = 10_000)
-    public void testBlockingQueueIteratorWithBlocking() throws Exception {
-      // The synchronous queue only allows for one element to transfer at a time and blocks
-      // the sending/receiving parties until both parties are there.
-      final BlockingQueueIterator<String> iterator =
-          new BlockingQueueIterator<>(new SynchronousQueue<>());
-      final CompletableFuture<List<String>> valuesFuture = new CompletableFuture<>();
-      Thread appender = new Thread() {
-        @Override
-        public void run() {
-          valuesFuture.complete(Arrays.asList(Iterators.toArray(iterator, String.class)));
-        }
-      };
-      appender.start();
-      iterator.accept("A");
-      iterator.accept("B");
-      iterator.close();
-      assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
-      appender.join();
-    }
-  }
-
-  /** Tests for {@link DataStreams.DataStreamDecoder}. */
-  @RunWith(JUnit4.class)
-  public static class DataStreamDecoderTest {
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
-    @Test
-    public void testEmptyInputStream() throws Exception {
-      testDecoderWith(StringUtf8Coder.of());
-    }
-
-    @Test
-    public void testNonEmptyInputStream() throws Exception {
-      testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
-    }
-
-    @Test
-    public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
-      CountingOutputStream countingOutputStream =
-          new CountingOutputStream(ByteStreams.nullOutputStream());
-      GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
-      assumeTrue(countingOutputStream.getCount() == 0);
-
-      testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
-    }
-
-    private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      for (T value : expected) {
-        int size = baos.size();
-        coder.encode(value, baos);
-        // Pad an arbitrary byte when values encode to zero bytes
-        if (baos.size() - size == 0) {
-          baos.write(0);
-        }
-      }
-
-      Iterator<T> decoder =
-          new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray()));
-
-      Object[] actual = Iterators.toArray(decoder, Object.class);
-      assertArrayEquals(expected, actual);
-
-      assertFalse(decoder.hasNext());
-      assertFalse(decoder.hasNext());
-
-      thrown.expect(NoSuchElementException.class);
-      decoder.next();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
deleted file mode 100644
index 05d8d5a..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestExecutors;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
-import org.apache.beam.harness.test.TestStreams;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link DirectStreamObserver}. */
-@RunWith(JUnit4.class)
-public class DirectStreamObserverTest {
-  @Rule public TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
-
-  @Test
-  public void testThreadSafety() throws Exception {
-    List<String> onNextValues = new ArrayList<>();
-    AdvancingPhaser phaser = new AdvancingPhaser(1);
-    final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
-    final DirectStreamObserver<String> streamObserver =
-        new DirectStreamObserver<>(
-            phaser,
-            TestStreams.withOnNext(
-                new Consumer<String>() {
-                  @Override
-                  public void accept(String t) {
-                    // Use the atomic boolean to detect if multiple threads are in this
-                    // critical section. Any thread that enters purposefully blocks by sleeping
-                    // to increase the contention between threads artificially.
-                    assertFalse(isCriticalSectionShared.getAndSet(true));
-                    Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-                    onNextValues.add(t);
-                    assertTrue(isCriticalSectionShared.getAndSet(false));
-                  }
-                }).build());
-
-    List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
-    List<Callable<String>> tasks = new ArrayList<>();
-    for (String prefix : prefixes) {
-      tasks.add(
-          new Callable<String>() {
-            @Override
-            public String call() throws Exception {
-              for (int i = 0; i < 10; i++) {
-                streamObserver.onNext(prefix + i);
-              }
-              return prefix;
-            }
-          });
-    }
-    executor.invokeAll(tasks);
-    streamObserver.onCompleted();
-
-    // Check that order was maintained.
-    int[] prefixesIndex = new int[prefixes.size()];
-    assertEquals(50, onNextValues.size());
-    for (String onNextValue : onNextValues) {
-      int prefix = Integer.parseInt(onNextValue.substring(0, 1));
-      int suffix = Integer.parseInt(onNextValue.substring(1, 2));
-      assertEquals(prefixesIndex[prefix], suffix);
-      prefixesIndex[prefix] += 1;
-    }
-  }
-
-  @Test
-  public void testIsReadyIsHonored() throws Exception {
-    AdvancingPhaser phaser = new AdvancingPhaser(1);
-    final AtomicBoolean elementsAllowed = new AtomicBoolean();
-    final DirectStreamObserver<String> streamObserver =
-        new DirectStreamObserver<>(
-            phaser,
-            TestStreams.withOnNext(
-                new Consumer<String>() {
-                  @Override
-                  public void accept(String t) {
-                    assertTrue(elementsAllowed.get());
-                  }
-                }).withIsReady(elementsAllowed::get).build());
-
-    // Start all the tasks
-    List<Future<String>> results = new ArrayList<>();
-    for (String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
-      results.add(
-          executor.submit(
-              new Callable<String>() {
-                @Override
-                public String call() throws Exception {
-                  for (int i = 0; i < 10; i++) {
-                    streamObserver.onNext(prefix + i);
-                  }
-                  return prefix;
-                }
-              }));
-    }
-
-    // Have them wait and then flip that we do allow elements and wake up those awaiting
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-    elementsAllowed.set(true);
-    phaser.arrive();
-
-    for (Future<String> result : results) {
-      result.get();
-    }
-    streamObserver.onCompleted();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
deleted file mode 100644
index 598d7f3..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserverTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.stream;
-
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ClientResponseObserver;
-import io.grpc.stub.StreamObserver;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Tests for {@link ForwardingClientResponseObserver}. */
-@RunWith(JUnit4.class)
-public class ForwardingClientResponseObserverTest {
-  @Test
-  public void testCallsAreForwardedAndOnReadyHandlerBound() {
-    @SuppressWarnings("unchecked")
-    StreamObserver<Object> delegateObserver = Mockito.mock(StreamObserver.class);
-    @SuppressWarnings("unchecked")
-    ClientCallStreamObserver<Object> callStreamObserver =
-        Mockito.mock(ClientCallStreamObserver.class);
-    Runnable onReadyHandler = new Runnable() {
-      @Override
-      public void run() {
-      }
-    };
-    ClientResponseObserver<Object, Object> observer =
-        new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler);
-    observer.onNext("A");
-    verify(delegateObserver).onNext("A");
-    Throwable t = new RuntimeException();
-    observer.onError(t);
-    verify(delegateObserver).onError(t);
-    observer.onCompleted();
-    verify(delegateObserver).onCompleted();
-    observer.beforeStart(callStreamObserver);
-    verify(callStreamObserver).setOnReadyHandler(onReadyHandler);
-    verifyNoMoreInteractions(delegateObserver, callStreamObserver);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
index 9331079..f80e8c4 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/StreamObserverFactoryTest.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertThat;
 
 import io.grpc.stub.CallStreamObserver;
 import io.grpc.stub.StreamObserver;
+import org.apache.beam.sdk.fn.stream.BufferingStreamObserver;
+import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
+import org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Before;
 import org.junit.Test;


Mime
View raw message