beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [02/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:47:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
deleted file mode 100644
index 689c387..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SerializableMatchers.java
+++ /dev/null
@@ -1,1181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.MoreObjects;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * Static class for building and using {@link SerializableMatcher} instances.
- *
- * <p>Most matchers are wrappers for hamcrest's {@link Matchers}. Please be familiar with the
- * documentation there. Values retained by a {@link SerializableMatcher} are required to be
- * serializable, either via Java serialization or via a provided {@link Coder}.
- *
- * <p>The following matchers are novel to Dataflow:
- * <ul>
- * <li>{@link #kvWithKey} for matching just the key of a {@link KV}.
- * <li>{@link #kvWithValue} for matching just the value of a {@link KV}.
- * <li>{@link #kv} for matching the key and value of a {@link KV}.
- * </ul>
- *
- * <p>For example, to match a group from
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}, which has type
- * {@code KV<K, Iterable<V>>} for some {@code K} and {@code V} and where the order of the iterable
- * is undefined, use a matcher like
- * {@code kv(equalTo("some key"), containsInAnyOrder(1, 2, 3))}.
- */
-class SerializableMatchers implements Serializable {
-
-  // Serializable only because of capture by anonymous inner classes
-  private SerializableMatchers() { } // not instantiable
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Iterable)}.
-   */
-  public static <T> SerializableMatcher<T>
-  allOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.allOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#allOf(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> allOf(final SerializableMatcher<T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.allOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Iterable)}.
-   */
-  public static <T> SerializableMatcher<T>
-  anyOf(Iterable<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Iterable<Matcher<? super T>> matchers = (Iterable) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.anyOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anyOf(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> anyOf(final SerializableMatcher<T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.anyOf(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#anything()}.
-   */
-  public static SerializableMatcher<Object> anything() {
-    return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
-      @Override
-      public Matcher<Object> get() {
-        return Matchers.anything();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T[]>
-  arrayContaining(final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContaining(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]>
-  arrayContaining(final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.<T>arrayContaining(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContaining(List)}.
-   */
-  public static <T> SerializableMatcher<T[]>
-  arrayContaining(List<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T[]>
-  arrayContainingInAnyOrder(final T... items) {
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContainingInAnyOrder(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContaining(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.<T>arrayContainingInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayContainingInAnyOrder(Collection)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayContainingInAnyOrder(
-      Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayContainingInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayWithSize(int)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayWithSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayWithSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#arrayWithSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<T[]> arrayWithSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.arrayWithSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#closeTo(double,double)}.
-   */
-  public static SerializableMatcher<Double> closeTo(final double target, final double error) {
-    return fromSupplier(new SerializableSupplier<Matcher<Double>>() {
-      @Override
-      public Matcher<Double> get() {
-        return Matchers.closeTo(target, error);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
-      final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.contains(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}. They are
-   * explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>>
-  contains(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>> contains(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.<T>contains(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#contains(List)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>> contains(
-      List<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final List<Matcher<? super T>> matchers = (List) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.contains(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Object[])}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<Iterable<? extends T>>
-  containsInAnyOrder(final T... items) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>>
-  containsInAnyOrder(Coder<T> coder, T... items) {
-
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Matcher[])}.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
-      final SerializableMatcher<? super T>... matchers) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.<T>containsInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#containsInAnyOrder(Collection)}.
-   */
-  public static <T> SerializableMatcher<Iterable<? extends T>> containsInAnyOrder(
-      Collection<SerializableMatcher<? super T>> serializableMatchers) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe covariant cast
-    final Collection<Matcher<? super T>> matchers = (Collection) serializableMatchers;
-
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.containsInAnyOrder(matchers);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#containsString}.
-   */
-  public static SerializableMatcher<String> containsString(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.containsString(substring);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#empty()}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> empty() {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.empty();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyArray()}.
-   */
-  public static <T> SerializableMatcher<T[]> emptyArray() {
-    return fromSupplier(new SerializableSupplier<Matcher<T[]>>() {
-      @Override
-      public Matcher<T[]> get() {
-        return Matchers.emptyArray();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#emptyIterable()}.
-   */
-  public static <T> SerializableMatcher<Iterable<? extends T>> emptyIterable() {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? extends T>>>() {
-      @Override
-      public Matcher<Iterable<? extends T>> get() {
-        return Matchers.emptyIterable();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#endsWith}.
-   */
-  public static SerializableMatcher<String> endsWith(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.endsWith(substring);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T> equalTo(final T expected) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.equalTo(expected);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#equalTo()}.
-   *
-   * <p>The expected value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> equalTo(Coder<T> coder, T expected) {
-
-    final SerializableSupplier<T> expectedSupplier = new SerializableViaCoder<>(coder, expected);
-
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.equalTo(expectedSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThan(final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThan(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#greaterThan()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThan(final Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThan(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#greaterThanOrEqualTo()}.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T> greaterThanOrEqualTo(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThanOrEqualTo(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#greaterThanOrEqualTo()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T>
-  greaterThanOrEqualTo(final Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.greaterThanOrEqualTo(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<Iterable<? super T>> hasItem(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Object)}.
-   *
-   * <p>The item of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<Iterable<? super T>> hasItem(Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasItem(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Iterable<? super T>> hasItem(
-      final SerializableMatcher<? super T> matcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<? super T>>>() {
-      @Override
-      public Matcher<Iterable<? super T>> get() {
-        return Matchers.hasItem(matcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(int)}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> hasSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.hasSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#hasSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Collection<? extends T>> hasSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Collection<? extends T>>>() {
-      @Override
-      public Matcher<Collection<? extends T>> get() {
-        return Matchers.hasSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#iterableWithSize(int)}.
-   */
-  public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(final int size) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
-      @Override
-      public Matcher<Iterable<T>> get() {
-        return Matchers.iterableWithSize(size);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#iterableWithSize(Matcher)}.
-   */
-  public static <T> SerializableMatcher<Iterable<T>> iterableWithSize(
-      final SerializableMatcher<? super Integer> sizeMatcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<Iterable<T>>>() {
-      @Override
-      public Matcher<Iterable<T>> get() {
-        return Matchers.iterableWithSize(sizeMatcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T>
-  isIn(final Collection<T> collection) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(collection);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Collection)}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> isIn(Coder<T> coder, Collection<T> collection) {
-    @SuppressWarnings("unchecked")
-    T[] items = (T[]) collection.toArray();
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
-   */
-  public static <T extends Serializable> SerializableMatcher<T> isIn(final T[] items) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(items);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isIn(Object[])}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T> SerializableMatcher<T> isIn(Coder<T> coder, T[] items) {
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isIn(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
-   */
-  @SafeVarargs
-  public static <T extends Serializable> SerializableMatcher<T> isOneOf(final T... elems) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isOneOf(elems);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#isOneOf}.
-   *
-   * <p>The items of type {@code T} will be serialized using the provided {@link Coder}.
-   * They are explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  @SafeVarargs
-  public static <T> SerializableMatcher<T> isOneOf(Coder<T> coder, T... items) {
-    final SerializableSupplier<T[]> itemsSupplier =
-        new SerializableArrayViaCoder<>(coder, items);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.isOneOf(itemsSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
-   */
-  public static <K extends Serializable, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithKey(K key) {
-    return new KvKeyMatcher<K, V>(equalTo(key));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified key.
-   *
-   * <p>The key of type {@code K} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithKey(Coder<K> coder, K key) {
-    return new KvKeyMatcher<K, V>(equalTo(coder, key));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching key.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithKey(
-      final SerializableMatcher<? super K> keyMatcher) {
-    return new KvKeyMatcher<K, V>(keyMatcher);
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
-   */
-  public static <K, V extends Serializable> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithValue(V value) {
-    return new KvValueMatcher<K, V>(equalTo(value));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with the specified value.
-   *
-   * <p>The value of type {@code V} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>>
-  kvWithValue(Coder<V> coder, V value) {
-    return new KvValueMatcher<K, V>(equalTo(coder, value));
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching value.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kvWithValue(
-      final SerializableMatcher<? super V> valueMatcher) {
-    return new KvValueMatcher<>(valueMatcher);
-  }
-
-  /**
-   * A {@link SerializableMatcher} that matches any {@link KV} with matching key and value.
-   */
-  public static <K, V> SerializableMatcher<KV<? extends K, ? extends V>> kv(
-      final SerializableMatcher<? super K> keyMatcher,
-      final SerializableMatcher<? super V> valueMatcher) {
-
-    return SerializableMatchers.<KV<? extends K, ? extends V>>allOf(
-        SerializableMatchers.<K, V>kvWithKey(keyMatcher),
-        SerializableMatchers.<K, V>kvWithValue(valueMatcher));
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThan(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThan(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#lessThan()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T>
-  lessThan(Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThan(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#lessThanOrEqualTo()}.
-   */
-  public static <T extends Comparable<T> & Serializable> SerializableMatcher<T> lessThanOrEqualTo(
-      final T target) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThanOrEqualTo(target);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#lessThanOrEqualTo()}.
-   *
-   * <p>The target value of type {@code T} will be serialized using the provided {@link Coder}.
-   * It is explicitly <i>not</i> required or expected to be serializable via Java serialization.
-   */
-  public static <T extends Comparable<T>> SerializableMatcher<T> lessThanOrEqualTo(
-      Coder<T> coder, T target) {
-    final SerializableSupplier<T> targetSupplier = new SerializableViaCoder<>(coder, target);
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.lessThanOrEqualTo(targetSupplier.get());
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#not}.
-   */
-  public static <T> SerializableMatcher<T> not(final SerializableMatcher<T> matcher) {
-    return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-      @Override
-      public Matcher<T> get() {
-        return Matchers.not(matcher);
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to
-   * {@link Matchers#nullValue}.
-   */
-  public static SerializableMatcher<Object> nullValue() {
-    return fromSupplier(new SerializableSupplier<Matcher<Object>>() {
-      @Override
-      public Matcher<Object> get() {
-        return Matchers.nullValue();
-      }
-    });
-  }
-
-  /**
-   * A {@link SerializableMatcher} with identical criteria to {@link Matchers#startsWith}.
-   */
-  public static SerializableMatcher<String> startsWith(final String substring) {
-    return fromSupplier(new SerializableSupplier<Matcher<String>>() {
-      @Override
-      public Matcher<String> get() {
-        return Matchers.startsWith(substring);
-      }
-    });
-  }
-
-  private static class KvKeyMatcher<K, V>
-  extends BaseMatcher<KV<? extends K, ? extends V>>
-  implements SerializableMatcher<KV<? extends K, ? extends V>> {
-    private final SerializableMatcher<? super K> keyMatcher;
-
-    public KvKeyMatcher(SerializableMatcher<? super K> keyMatcher) {
-      this.keyMatcher = keyMatcher;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      @SuppressWarnings("unchecked")
-      KV<K, ?> kvItem = (KV<K, ?>) item;
-      return keyMatcher.matches(kvItem.getKey());
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      @SuppressWarnings("unchecked")
-      KV<K, ?> kvItem = (KV<K, ?>) item;
-      if (!keyMatcher.matches(kvItem.getKey())) {
-        mismatchDescription.appendText("key did not match: ");
-        keyMatcher.describeMismatch(kvItem.getKey(), mismatchDescription);
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("KV with key matching ");
-      keyMatcher.describeTo(description);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .addValue(keyMatcher)
-          .toString();
-    }
-  }
-
-  private static class KvValueMatcher<K, V>
-  extends BaseMatcher<KV<? extends K, ? extends V>>
-  implements SerializableMatcher<KV<? extends K, ? extends V>> {
-    private final SerializableMatcher<? super V> valueMatcher;
-
-    public KvValueMatcher(SerializableMatcher<? super V> valueMatcher) {
-      this.valueMatcher = valueMatcher;
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      @SuppressWarnings("unchecked")
-      KV<?, V> kvItem = (KV<?, V>) item;
-      return valueMatcher.matches(kvItem.getValue());
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      @SuppressWarnings("unchecked")
-      KV<?, V> kvItem = (KV<?, V>) item;
-      if (!valueMatcher.matches(kvItem.getValue())) {
-        mismatchDescription.appendText("value did not match: ");
-        valueMatcher.describeMismatch(kvItem.getValue(), mismatchDescription);
-      }
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("KV with value matching ");
-      valueMatcher.describeTo(description);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .addValue(valueMatcher)
-          .toString();
-    }
-  }
-
-  /**
-   * Constructs a {@link SerializableMatcher} from a non-serializable {@link Matcher} via
-   * indirection through {@link SerializableSupplier}.
-   *
-   * <p>To wrap a {@link Matcher} which is not serializable, provide a {@link SerializableSupplier}
-   * with a {@link SerializableSupplier#get()} method that returns a fresh instance of the
-   * {@link Matcher} desired. The resulting {@link SerializableMatcher} will behave according to
-   * the {@link Matcher} returned by {@link SerializableSupplier#get() get()} when it is invoked
-   * during matching (which may occur on another machine, such as a Dataflow worker).
-   *
-   * <code>
-   * return fromSupplier(new SerializableSupplier<Matcher<T>>() {
-   *   *     @Override
-   *     public Matcher<T> get() {
-   *       return new MyMatcherForT();
-   *     }
-   * });
-   * </code>
-   */
-  public static <T> SerializableMatcher<T> fromSupplier(
-      SerializableSupplier<Matcher<T>> supplier) {
-    return new SerializableMatcherFromSupplier<>(supplier);
-  }
-
-  /**
-   * Supplies values of type {@code T}, and is serializable. Thus, even if {@code T} is not
-   * serializable, the supplier can be serialized and provide a {@code T} wherever it is
-   * deserialized.
-   *
-   * @param <T> the type of value supplied.
-   */
-  public interface SerializableSupplier<T> extends Serializable {
-    T get();
-  }
-
-  /**
-   * Since the delegate {@link Matcher} is not generally serializable, instead this takes a nullary
-   * SerializableFunction to return such a matcher.
-   */
-  private static class SerializableMatcherFromSupplier<T> extends BaseMatcher<T>
-  implements SerializableMatcher<T> {
-
-    private SerializableSupplier<Matcher<T>> supplier;
-
-    public SerializableMatcherFromSupplier(SerializableSupplier<Matcher<T>> supplier) {
-      this.supplier = supplier;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      supplier.get().describeTo(description);
-    }
-
-    @Override
-    public boolean matches(Object item) {
-      return supplier.get().matches(item);
-    }
-
-    @Override
-    public void describeMismatch(Object item, Description mismatchDescription) {
-      supplier.get().describeMismatch(item, mismatchDescription);
-    }
-  }
-
-  /**
-   * Wraps any value that can be encoded via a {@link Coder} to make it {@link Serializable}.
-   * This is not likely to be a good encoding, so should be used only for tests, where data
-   * volume is small and minor costs are not critical.
-   */
-  private static class SerializableViaCoder<T> implements SerializableSupplier<T> {
-    /** Cached value that is not serialized. */
-    @Nullable
-    private transient T value;
-
-    /** The bytes of {@link #value} when encoded via {@link #coder}. */
-    private byte[] encodedValue;
-
-    private Coder<T> coder;
-
-    public SerializableViaCoder(Coder<T> coder, T value) {
-      this.coder = coder;
-      this.value = value;
-      try {
-        this.encodedValue = CoderUtils.encodeToByteArray(coder, value);
-      } catch (CoderException exc) {
-        throw new RuntimeException("Error serializing via Coder", exc);
-      }
-    }
-
-    @Override
-    public T get() {
-      if (value == null) {
-        try {
-          value = CoderUtils.decodeFromByteArray(coder, encodedValue);
-        } catch (CoderException exc) {
-          throw new RuntimeException("Error deserializing via Coder", exc);
-        }
-      }
-      return value;
-    }
-  }
-
-  /**
-   * Wraps any array with values that can be encoded via a {@link Coder} to make it
-   * {@link Serializable}. This is not likely to be a good encoding, so should be used only for
-   * tests, where data volume is small and minor costs are not critical.
-   */
-  private static class SerializableArrayViaCoder<T> implements SerializableSupplier<T[]> {
-    /** Cached value that is not serialized. */
-    @Nullable
-    private transient T[] value;
-
-    /** The bytes of {@link #value} when encoded via {@link #coder}. */
-    private byte[] encodedValue;
-
-    private Coder<List<T>> coder;
-
-    public SerializableArrayViaCoder(Coder<T> elementCoder, T[] value) {
-      this.coder = ListCoder.of(elementCoder);
-      this.value = value;
-      try {
-        this.encodedValue = CoderUtils.encodeToByteArray(coder, Arrays.asList(value));
-      } catch (CoderException exc) {
-        throw UserCodeException.wrap(exc);
-      }
-    }
-
-    @Override
-    public T[] get() {
-      if (value == null) {
-        try {
-          @SuppressWarnings("unchecked")
-          T[] decoded = (T[]) CoderUtils.decodeFromByteArray(coder, encodedValue).toArray();
-          value = decoded;
-        } catch (CoderException exc) {
-          throw new RuntimeException("Error deserializing via Coder", exc);
-        }
-      }
-      return value;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
deleted file mode 100644
index feff16a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
+++ /dev/null
@@ -1,676 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Helper functions and test harnesses for checking correctness of {@link Source}
- * implementations.
- *
- * <p>Contains a few lightweight utilities (e.g. reading items from a source or a reader,
- * such as {@link #readFromSource} and {@link #readFromUnstartedReader}), as well as
- * heavyweight property testing and stress testing harnesses that help getting a large
- * amount of test coverage with few code. Most notable ones are:
- * <ul>
- *   <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read
- *   by the union of sources produced by {@link BoundedSource#splitIntoBundles}
- *   is the same as data read by the original source.
- *   <li>If your source implements dynamic work rebalancing, use the
- *   {@code assertSplitAtFraction} family of functions - they test behavior of
- *   {@link BoundedSource.BoundedReader#splitAtFraction}, in particular, that
- *   various consistency properties are respected and the total set of data read
- *   by the source is preserved when splits happen.
- *   Use {@link #assertSplitAtFractionBehavior} to test individual cases
- *   of {@code splitAtFraction} and use {@link #assertSplitAtFractionExhaustive}
- *   as a heavy-weight stress test including concurrency. We strongly recommend to
- *   use both.
- * </ul>
- * For example usages, see the unit tests of classes such as
- * {@link com.google.cloud.dataflow.sdk.io.AvroSource} or
- * {@link com.google.cloud.dataflow.sdk.io.XmlSource}.
- *
- * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.
- */
-public class SourceTestUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(SourceTestUtils.class);
-
-  // A wrapper around a value of type T that compares according to the structural
-  // value provided by a Coder<T>, but prints both the original and structural value,
-  // to help get good error messages from JUnit equality assertion failures and such.
-  private static class ReadableStructuralValue<T> {
-    private T originalValue;
-    private Object structuralValue;
-
-    public ReadableStructuralValue(T originalValue, Object structuralValue) {
-      this.originalValue = originalValue;
-      this.structuralValue = structuralValue;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(structuralValue);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == null || !(obj instanceof ReadableStructuralValue)) {
-        return false;
-      }
-      return Objects.equals(structuralValue, ((ReadableStructuralValue) obj).structuralValue);
-    }
-
-    @Override
-    public String toString() {
-      return String.format("[%s (structural %s)]", originalValue, structuralValue);
-    }
-  }
-
-  /**
-   * Testing utilities below depend on standard assertions and matchers to compare elements read by
-   * sources. In general the elements may not implement {@code equals}/{@code hashCode} properly,
-   * however every source has a {@link Coder} and every {@code Coder} can
-   * produce a {@link Coder#structuralValue} whose {@code equals}/{@code hashCode} is
-   * consistent with equality of encoded format.
-   * So we use this {@link Coder#structuralValue} to compare elements read by sources.
-   */
-  public static <T> List<ReadableStructuralValue<T>> createStructuralValues(
-      Coder<T> coder, List<T> list)
-      throws Exception {
-    List<ReadableStructuralValue<T>> result = new ArrayList<>();
-    for (T elem : list) {
-      result.add(new ReadableStructuralValue<>(elem, coder.structuralValue(elem)));
-    }
-    return result;
-  }
-
-  /**
-   * Reads all elements from the given {@link BoundedSource}.
-   */
-  public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options)
-      throws IOException {
-    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
-      return readFromUnstartedReader(reader);
-    }
-  }
-
-  /**
-   * Reads all elements from the given unstarted {@link Source.Reader}.
-   */
-  public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {
-    return readRemainingFromReader(reader, false);
-  }
-
-  /**
-   * Reads all elements from the given started {@link Source.Reader}.
-   */
-  public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {
-    return readRemainingFromReader(reader, true);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} until n elements are read.
-   */
-  public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)
-      throws IOException {
-    return readNItemsFromReader(reader, n, false);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} that has already had {@link Source.Reader#start}
-   * called on it, until n elements are read.
-   */
-  public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n)
-      throws IOException {
-    return readNItemsFromReader(reader, n, true);
-  }
-
-  /**
-   * Read elements from a {@link Source.Reader} until n elements are read.
-   *
-   * <p>There must be at least n elements remaining in the reader, except for
-   * the case when n is {@code Integer.MAX_VALUE}, which means "read all
-   * remaining elements".
-   */
-  private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started)
-      throws IOException {
-    List<T> res = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
-      boolean shouldStart = (i == 0 && !started);
-      boolean more = shouldStart ? reader.start() : reader.advance();
-      if (n != Integer.MAX_VALUE) {
-        assertTrue(more);
-      }
-      if (!more) {
-        break;
-      }
-      res.add(reader.getCurrent());
-    }
-    return res;
-  }
-
-  /**
-   * Read all remaining elements from a {@link Source.Reader}.
-   */
-  public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started)
-      throws IOException {
-    return readNItemsFromReader(reader, Integer.MAX_VALUE, started);
-  }
-
-  /**
-   * Given a reference {@code Source} and a list of {@code Source}s, assert that the union of
-   * the records read from the list of sources is equal to the records read from the reference
-   * source.
-   */
-  public static <T> void assertSourcesEqualReferenceSource(
-      BoundedSource<T> referenceSource,
-      List<? extends BoundedSource<T>> sources,
-      PipelineOptions options)
-      throws Exception {
-    Coder<T> coder = referenceSource.getDefaultOutputCoder();
-    List<T> referenceRecords = readFromSource(referenceSource, options);
-    List<T> bundleRecords = new ArrayList<>();
-    for (BoundedSource<T> source : sources) {
-      assertThat(
-          "Coder type for source "
-              + source
-              + " is not compatible with Coder type for referenceSource "
-              + referenceSource,
-          source.getDefaultOutputCoder(),
-          equalTo(coder));
-      List<T> elems = readFromSource(source, options);
-      bundleRecords.addAll(elems);
-    }
-    List<ReadableStructuralValue<T>> bundleValues =
-        createStructuralValues(coder, bundleRecords);
-    List<ReadableStructuralValue<T>> referenceValues =
-        createStructuralValues(coder, referenceRecords);
-    assertThat(bundleValues, containsInAnyOrder(referenceValues.toArray()));
-  }
-
-  /**
-   * Assert that a {@code Reader} returns a {@code Source} that, when read from, produces the same
-   * records as the reader.
-   */
-  public static <T> void assertUnstartedReaderReadsSameAsItsSource(
-      BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
-    Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
-    List<T> expected = readFromUnstartedReader(reader);
-    List<T> actual = readFromSource(reader.getCurrentSource(), options);
-    List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
-    List<ReadableStructuralValue<T>> actualStructural = createStructuralValues(coder, actual);
-    assertThat(actualStructural, containsInAnyOrder(expectedStructural.toArray()));
-  }
-
-  /**
-   * Expected outcome of
-   * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader#splitAtFraction}.
-   */
-  public enum ExpectedSplitOutcome {
-    /**
-     * The operation must succeed and the results must be consistent.
-     */
-    MUST_SUCCEED_AND_BE_CONSISTENT,
-    /**
-     * The operation must fail (return {@code null}).
-     */
-    MUST_FAIL,
-    /**
-     * The operation must either fail, or succeed and the results be consistent.
-     */
-    MUST_BE_CONSISTENT_IF_SUCCEEDS
-  }
-
-  /**
-   * Contains two values: the number of items in the primary source, and the number of items in
-   * the residual source, -1 if split failed.
-   */
-  private static class SplitAtFractionResult {
-    public int numPrimaryItems;
-    public int numResidualItems;
-
-    public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {
-      this.numPrimaryItems = numPrimaryItems;
-      this.numResidualItems = numResidualItems;
-    }
-  }
-
-  /**
-   * Asserts that the {@code source}'s reader either fails to {@code splitAtFraction(fraction)}
-   * after reading {@code numItemsToReadBeforeSplit} items, or succeeds in a way that is
-   * consistent according to {@link #assertSplitAtFractionSucceedsAndConsistent}.
-   * <p> Returns SplitAtFractionResult.
-   */
-
-  public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      ExpectedSplitOutcome expectedOutcome,
-      PipelineOptions options)
-      throws Exception {
-    return assertSplitAtFractionBehaviorImpl(
-        source, readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction,
-        expectedOutcome, options);
-  }
-
-  /**
-   * Compares two lists elementwise and throws a detailed assertion failure optimized for
-   * human reading in case they are unequal.
-   */
-  private static <T> void assertListsEqualInOrder(
-      String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {
-    int i = 0;
-    for (; i < expected.size() && i < actual.size(); ++i) {
-      if (!Objects.equals(expected.get(i), actual.get(i))) {
-        Assert.fail(String.format(
-            "%s: %s and %s have %d items in common and then differ. "
-            + "Item in %s (%d more): %s, item in %s (%d more): %s",
-            message, expectedLabel, actualLabel, i,
-            expectedLabel, expected.size() - i - 1, expected.get(i),
-            actualLabel, actual.size() - i - 1, actual.get(i)));
-      }
-    }
-    if (i < expected.size() /* but i == actual.size() */) {
-      Assert.fail(String.format(
-          "%s: %s has %d more items after matching all %d from %s. First 5: %s",
-          message, expectedLabel, expected.size() - actual.size(), actual.size(), actualLabel,
-          expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));
-    } else if (i < actual.size() /* but i == expected.size() */) {
-      Assert.fail(String.format(
-          "%s: %s has %d more items after matching all %d from %s. First 5: %s",
-          message, actualLabel, actual.size() - expected.size(), expected.size(), expectedLabel,
-          actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));
-    } else {
-      // All is well.
-    }
-  }
-
-  private static <T> SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl(
-      BoundedSource<T> source, List<T> expectedItems, int numItemsToReadBeforeSplit,
-      double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options)
-      throws Exception {
-    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {
-      BoundedSource<T> originalSource = reader.getCurrentSource();
-      List<T> currentItems = readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);
-      BoundedSource<T> residual = reader.splitAtFraction(splitFraction);
-      if (residual != null) {
-        assertFalse(
-            String.format(
-                "Primary source didn't change after a successful split of %s at %f "
-                + "after reading %d items. "
-                + "Was the source object mutated instead of creating a new one? "
-                + "Source objects MUST be immutable.",
-                source, splitFraction, numItemsToReadBeforeSplit),
-            reader.getCurrentSource() == originalSource);
-        assertFalse(
-            String.format(
-                "Residual source equal to original source after a successful split of %s at %f "
-                + "after reading %d items. "
-                + "Was the source object mutated instead of creating a new one? "
-                + "Source objects MUST be immutable.",
-                source, splitFraction, numItemsToReadBeforeSplit),
-            reader.getCurrentSource() == residual);
-      }
-      // Failure cases are: must succeed but fails; must fail but succeeds.
-      switch (expectedOutcome) {
-        case MUST_SUCCEED_AND_BE_CONSISTENT:
-          assertNotNull(
-              "Failed to split reader of source: "
-                  + source
-                  + " at "
-                  + splitFraction
-                  + " after reading "
-                  + numItemsToReadBeforeSplit
-                  + " items",
-              residual);
-          break;
-        case MUST_FAIL:
-          assertEquals(null, residual);
-          break;
-        case MUST_BE_CONSISTENT_IF_SUCCEEDS:
-          // Nothing.
-          break;
-      }
-      currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));
-      BoundedSource<T> primary = reader.getCurrentSource();
-      return verifySingleSplitAtFractionResult(
-          source, expectedItems, currentItems, primary, residual,
-          numItemsToReadBeforeSplit, splitFraction, options);
-    }
-  }
-
-  private static <T> SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult(
-      BoundedSource<T> source, List<T> expectedItems, List<T> currentItems,
-      BoundedSource<T> primary, BoundedSource<T> residual,
-      int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options)
-      throws Exception {
-    List<T> primaryItems = readFromSource(primary, options);
-    if (residual != null) {
-      List<T> residualItems = readFromSource(residual, options);
-      List<T> totalItems = new ArrayList<>();
-      totalItems.addAll(primaryItems);
-      totalItems.addAll(residualItems);
-      String errorMsgForPrimarySourceComp =
-          String.format(
-              "Continued reading after split yielded different items than primary source: "
-                  + "split at %s after reading %s items, original source: %s, primary source: %s",
-              splitFraction,
-              numItemsToReadBeforeSplit,
-              source,
-              primary);
-      String errorMsgForTotalSourceComp =
-          String.format(
-              "Items in primary and residual sources after split do not add up to items "
-                  + "in the original source. Split at %s after reading %s items; "
-                  + "original source: %s, primary: %s, residual: %s",
-              splitFraction,
-              numItemsToReadBeforeSplit,
-              source,
-              primary,
-              residual);
-      Coder<T> coder = primary.getDefaultOutputCoder();
-      List<ReadableStructuralValue<T>> primaryValues =
-          createStructuralValues(coder, primaryItems);
-      List<ReadableStructuralValue<T>> currentValues =
-          createStructuralValues(coder, currentItems);
-      List<ReadableStructuralValue<T>> expectedValues =
-          createStructuralValues(coder, expectedItems);
-      List<ReadableStructuralValue<T>> totalValues =
-          createStructuralValues(coder, totalItems);
-      assertListsEqualInOrder(
-          errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);
-      assertListsEqualInOrder(
-          errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);
-      return new SplitAtFractionResult(primaryItems.size(), residualItems.size());
-    }
-    return new SplitAtFractionResult(primaryItems.size(), -1);
-  }
-
-  /**
-   * Verifies some consistency properties of
-   * {@link BoundedSource.BoundedReader#splitAtFraction} on the given source. Equivalent to
-   * the following pseudocode:
-   * <pre>
-   *   Reader reader = source.createReader();
-   *   read N items from reader;
-   *   Source residual = reader.splitAtFraction(splitFraction);
-   *   Source primary = reader.getCurrentSource();
-   *   assert: items in primary == items we read so far
-   *                               + items we'll get by continuing to read from reader;
-   *   assert: items in original source == items in primary + items in residual
-   * </pre>
-   */
-  public static <T> void assertSplitAtFractionSucceedsAndConsistent(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      PipelineOptions options)
-      throws Exception {
-    assertSplitAtFractionBehavior(
-        source,
-        numItemsToReadBeforeSplit,
-        splitFraction,
-        ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT,
-        options);
-  }
-
-  /**
-   * Asserts that the {@code source}'s reader fails to {@code splitAtFraction(fraction)}
-   * after reading {@code numItemsToReadBeforeSplit} items.
-   */
-  public static <T> void assertSplitAtFractionFails(
-      BoundedSource<T> source,
-      int numItemsToReadBeforeSplit,
-      double splitFraction,
-      PipelineOptions options)
-      throws Exception {
-    assertSplitAtFractionBehavior(
-        source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);
-  }
-
-  private static class SplitFractionStatistics {
-    List<Double> successfulFractions = new ArrayList<>();
-    List<Double> nonTrivialFractions = new ArrayList<>();
-  }
-
-  /**
-   * Asserts that given a start position,
-   * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
-   * between two fractions that differ by at least one item) can be called successfully and the
-   * results are consistent if a split succeeds.
-   */
-  private static <T> void assertSplitAtFractionBinary(
-      BoundedSource<T> source,
-      List<T> expectedItems,
-      int numItemsToBeReadBeforeSplit,
-      double leftFraction,
-      SplitAtFractionResult leftResult,
-      double rightFraction,
-      SplitAtFractionResult rightResult,
-      PipelineOptions options,
-      SplitFractionStatistics stats)
-      throws Exception {
-    if (rightFraction - leftFraction < 0.001) {
-      // Do not recurse too deeply. Otherwise we will end up in infinite
-      // recursion, e.g., while trying to find the exact minimal fraction s.t.
-      // split succeeds. A precision of 0.001 when looking for such a fraction
-      // ought to be enough for everybody.
-      return;
-    }
-    double middleFraction = (rightFraction + leftFraction) / 2;
-    if (leftResult == null) {
-      leftResult = assertSplitAtFractionBehaviorImpl(
-          source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction,
-          ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    }
-    if (rightResult == null) {
-      rightResult = assertSplitAtFractionBehaviorImpl(
-          source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction,
-          ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    }
-    SplitAtFractionResult middleResult = assertSplitAtFractionBehaviorImpl(
-        source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction,
-        ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-    if (middleResult.numResidualItems != -1) {
-      stats.successfulFractions.add(middleFraction);
-    }
-    if (middleResult.numResidualItems > 0) {
-      stats.nonTrivialFractions.add(middleFraction);
-    }
-    // Two split fractions are equivalent if they yield the same number of
-    // items in primary vs. residual source. Left and right are already not
-    // equivalent. Recurse into [left, middle) and [right, middle) respectively
-    // if middle is not equivalent to left or right.
-    if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
-      assertSplitAtFractionBinary(
-          source, expectedItems, numItemsToBeReadBeforeSplit,
-          leftFraction, leftResult, middleFraction, middleResult, options, stats);
-    }
-    if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
-      assertSplitAtFractionBinary(
-          source, expectedItems, numItemsToBeReadBeforeSplit,
-          middleFraction, middleResult, rightFraction, rightResult, options, stats);
-    }
-  }
-
-  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100;
-  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000;
-
-  /**
-   * Asserts that for each possible start position,
-   * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
-   * between two fractions that differ by at least one item) can be called successfully and the
-   * results are consistent if a split succeeds. Verifies multithreaded splitting as well.
-   */
-  public static <T> void assertSplitAtFractionExhaustive(
-      BoundedSource<T> source, PipelineOptions options) throws Exception {
-    List<T> expectedItems = readFromSource(source, options);
-    assertFalse("Empty source", expectedItems.isEmpty());
-    assertFalse("Source reads a single item", expectedItems.size() == 1);
-    List<List<Double>> allNonTrivialFractions = new ArrayList<>();
-    {
-      boolean anySuccessfulFractions = false;
-      boolean anyNonTrivialFractions = false;
-      for (int i = 0; i < expectedItems.size(); i++) {
-        SplitFractionStatistics stats = new SplitFractionStatistics();
-        assertSplitAtFractionBinary(source, expectedItems, i,
-            0.0, null, 1.0, null, options, stats);
-        if (!stats.successfulFractions.isEmpty()) {
-          anySuccessfulFractions = true;
-        }
-        if (!stats.nonTrivialFractions.isEmpty()) {
-          anyNonTrivialFractions = true;
-        }
-        allNonTrivialFractions.add(stats.nonTrivialFractions);
-      }
-      assertTrue(
-          "splitAtFraction test completed vacuously: no successful split fractions found",
-          anySuccessfulFractions);
-      assertTrue(
-          "splitAtFraction test completed vacuously: no non-trivial split fractions found",
-          anyNonTrivialFractions);
-    }
-    {
-      // Perform a stress test of "racy" concurrent splitting:
-      // for every position (number of items read), try to split at the minimum nontrivial
-      // split fraction for that position concurrently with reading the record at that position.
-      // To ensure that the test is non-vacuous, make sure that the splitting succeeds
-      // at least once and fails at least once.
-      ExecutorService executor = Executors.newFixedThreadPool(2);
-      int numTotalTrials = 0;
-      for (int i = 0; i < expectedItems.size(); i++) {
-        double minNonTrivialFraction = 2.0;  // Greater than any possible fraction.
-        for (double fraction : allNonTrivialFractions.get(i)) {
-          minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
-        }
-        if (minNonTrivialFraction == 2.0) {
-          // This will not happen all the time because otherwise the test above would
-          // detect vacuousness.
-          continue;
-        }
-        int numTrials = 0;
-        boolean haveSuccess = false, haveFailure = false;
-        while (true) {
-          ++numTrials;
-          if (numTrials > MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM) {
-            LOG.warn(
-                "After {} concurrent splitting trials at item #{}, observed only {}, "
-                + "giving up on this item",
-                numTrials, i, haveSuccess ? "success" : "failure");
-            break;
-          }
-          if (assertSplitAtFractionConcurrent(
-              executor, source, expectedItems, i, minNonTrivialFraction, options)) {
-            haveSuccess = true;
-          } else {
-            haveFailure = true;
-          }
-          if (haveSuccess && haveFailure) {
-            LOG.info(
-                "{} trials to observe both success and failure of concurrent splitting at item #{}",
-                numTrials, i);
-            break;
-          }
-        }
-        numTotalTrials += numTrials;
-        if (numTotalTrials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL) {
-          LOG.warn(
-              "After {} total concurrent splitting trials, considered only {} items, giving up.",
-              numTotalTrials, i);
-          break;
-        }
-      }
-      LOG.info(
-          "{} total concurrent splitting trials for {} items",
-          numTotalTrials, expectedItems.size());
-    }
-  }
-
-  private static <T> boolean assertSplitAtFractionConcurrent(
-      ExecutorService executor, BoundedSource<T> source, List<T> expectedItems,
-      final int numItemsToReadBeforeSplitting, final double fraction, PipelineOptions options)
-      throws Exception {
-    @SuppressWarnings("resource")  // Closed in readerThread
-    final BoundedSource.BoundedReader<T> reader = source.createReader(options);
-    final CountDownLatch unblockSplitter = new CountDownLatch(1);
-    Future<List<T>> readerThread =
-        executor.submit(
-            new Callable<List<T>>() {
-              @Override
-              public List<T> call() throws Exception {
-                try {
-                  List<T> items =
-                      readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);
-                  unblockSplitter.countDown();
-                  items.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));
-                  return items;
-                } finally {
-                  reader.close();
-                }
-              }
-            });
-    Future<KV<BoundedSource<T>, BoundedSource<T>>> splitterThread = executor.submit(
-        new Callable<KV<BoundedSource<T>, BoundedSource<T>>>() {
-          @Override
-          public KV<BoundedSource<T>, BoundedSource<T>> call() throws Exception {
-            unblockSplitter.await();
-            BoundedSource<T> residual = reader.splitAtFraction(fraction);
-            if (residual == null) {
-              return null;
-            }
-            return KV.of(reader.getCurrentSource(), residual);
-          }
-        });
-    List<T> currentItems = readerThread.get();
-    KV<BoundedSource<T>, BoundedSource<T>> splitSources = splitterThread.get();
-    if (splitSources == null) {
-      return false;
-    }
-    SplitAtFractionResult res = verifySingleSplitAtFractionResult(
-        source, expectedItems, currentItems, splitSources.getKey(), splitSources.getValue(),
-        numItemsToReadBeforeSplitting, fraction, options);
-    return (res.numResidualItems > 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
deleted file mode 100644
index 17b1538..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterators;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
-/**
- * A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against a remote pipeline runner.
- *
- * <p>It is recommended to tag hand-selected tests for this purpose using the
- * {@link RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner
- * will utilize resources of that pipeline runner.
- *
- * <p>In order to run tests on a pipeline runner, the following conditions must be met:
- * <ul>
- *   <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
- *   options. For example:
- *   <pre>{@code [
- *     "--runner=com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineRunner",
- *     "--project=mygcpproject",
- *     "--stagingLocation=gs://mygcsbucket/path"
- *     ]}</pre>
- *     Note that the set of pipeline options required is pipeline runner specific.
- *   </li>
- *   <li>Jars containing the SDK and test classes must be available on the classpath.</li>
- * </ul>
- *
- * <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
- * remote execution modes. For example:
- * <pre>{@code
- * Pipeline p = TestPipeline.create();
- * PCollection<Integer> output = ...
- *
- * PAssert.that(output)
- *     .containsInAnyOrder(1, 2, 3, 4);
- * p.run();
- * }</pre>
- *
- * <p>For pipeline runners, it is required that they must throw an {@link AssertionError}
- * containing the message from the {@link PAssert} that failed.
- */
-public class TestPipeline extends Pipeline {
-  private static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  /**
-   * Creates and returns a new test pipeline.
-   *
-   * <p>Use {@link PAssert} to add tests, then call
-   * {@link Pipeline#run} to execute the pipeline and check the tests.
-   */
-  public static TestPipeline create() {
-    return fromOptions(testingPipelineOptions());
-  }
-
-  public static TestPipeline fromOptions(PipelineOptions options) {
-    return new TestPipeline(PipelineRunner.fromOptions(options), options);
-  }
-
-  private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
-    super(runner, options);
-  }
-
-  /**
-   * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError}
-   * that is raised during testing.
-   */
-  @Override
-  public PipelineResult run() {
-    try {
-      return super.run();
-    } catch (RuntimeException exc) {
-      Throwable cause = exc.getCause();
-      if (cause instanceof AssertionError) {
-        throw (AssertionError) cause;
-      } else {
-        throw exc;
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
-  }
-
-  /**
-   * Creates {@link PipelineOptions} for testing.
-   */
-  public static PipelineOptions testingPipelineOptions() {
-    try {
-      @Nullable String beamTestPipelineOptions =
-          System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
-
-      PipelineOptions options =
-          Strings.isNullOrEmpty(beamTestPipelineOptions)
-              ? PipelineOptionsFactory.create()
-              : PipelineOptionsFactory.fromArgs(
-                      MAPPER.readValue(
-                          System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), String[].class))
-                  .as(PipelineOptions.class);
-
-      options.as(ApplicationNameOptions.class).setAppName(getAppName());
-      // If no options were specified, use a test credential object on all pipelines.
-      if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
-        options.as(GcpOptions.class).setGcpCredential(new TestCredential());
-      }
-      options.setStableUniqueNames(CheckEnabled.ERROR);
-      return options;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to instantiate test options from system property "
-          + PROPERTY_BEAM_TEST_PIPELINE_OPTIONS + ":"
-          + System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), e);
-    }
-  }
-
-  /** Returns the class + method name of the test, or a default name. */
-  private static String getAppName() {
-    Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();
-    if (stackTraceElement.isPresent()) {
-      String methodName = stackTraceElement.get().getMethodName();
-      String className = stackTraceElement.get().getClassName();
-      if (className.contains(".")) {
-        className = className.substring(className.lastIndexOf(".") + 1);
-      }
-      return className + "-" + methodName;
-    }
-    return "UnitTest";
-  }
-
-  /** Returns the {@link StackTraceElement} of the calling class. */
-  private static Optional<StackTraceElement> findCallersStackTrace() {
-    Iterator<StackTraceElement> elements =
-        Iterators.forArray(Thread.currentThread().getStackTrace());
-    // First find the TestPipeline class in the stack trace.
-    while (elements.hasNext()) {
-      StackTraceElement next = elements.next();
-      if (TestPipeline.class.getName().equals(next.getClassName())) {
-        break;
-      }
-    }
-    // Then find the first instance after that is not the TestPipeline
-    Optional<StackTraceElement> firstInstanceAfterTestPipeline = Optional.absent();
-    while (elements.hasNext()) {
-      StackTraceElement next = elements.next();
-      if (!TestPipeline.class.getName().equals(next.getClassName())) {
-        if (!firstInstanceAfterTestPipeline.isPresent()) {
-          firstInstanceAfterTestPipeline = Optional.of(next);
-        }
-        try {
-          Class<?> nextClass = Class.forName(next.getClassName());
-          for (Method method : nextClass.getMethods()) {
-            if (method.getName().equals(next.getMethodName())) {
-              if (method.isAnnotationPresent(org.junit.Test.class)) {
-                return Optional.of(next);
-              } else if (method.isAnnotationPresent(org.junit.Before.class)) {
-                break;
-              }
-            }
-          }
-        } catch (Throwable t) {
-          break;
-        }
-      }
-    }
-    return firstInstanceAfterTestPipeline;
-  }
-}


Mime
View raw message