beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [06/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:47:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ZipFiles.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ZipFiles.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ZipFiles.java
deleted file mode 100644
index 773b65f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ZipFiles.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterators;
-import com.google.common.io.ByteSource;
-import com.google.common.io.CharSource;
-import com.google.common.io.Closer;
-import com.google.common.io.Files;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-import java.util.zip.ZipOutputStream;
-
-/**
- * Functions for zipping a directory (including a subdirectory) into a ZIP-file
- * or unzipping it again.
- */
-public final class ZipFiles {
-  private ZipFiles() {}
-
-  /**
-   * Returns a new {@link ByteSource} for reading the contents of the given
-   * entry in the given zip file.
-   */
-  static ByteSource asByteSource(ZipFile file, ZipEntry entry) {
-    return new ZipEntryByteSource(file, entry);
-  }
-
-  /**
-   * Returns a new {@link CharSource} for reading the contents of the given
-   * entry in the given zip file as text using the given charset.
-   */
-  static CharSource asCharSource(
-      ZipFile file, ZipEntry entry, Charset charset) {
-    return asByteSource(file, entry).asCharSource(charset);
-  }
-
-  private static final class ZipEntryByteSource extends ByteSource {
-
-    private final ZipFile file;
-    private final ZipEntry entry;
-
-    ZipEntryByteSource(ZipFile file, ZipEntry entry) {
-      this.file = checkNotNull(file);
-      this.entry = checkNotNull(entry);
-    }
-
-    @Override
-    public InputStream openStream() throws IOException {
-      return file.getInputStream(entry);
-    }
-
-    // TODO: implement size() to try calling entry.getSize()?
-
-    @Override
-    public String toString() {
-      return "ZipFiles.asByteSource(" + file + ", " + entry + ")";
-    }
-  }
-
-  /**
-   * Returns a {@link FluentIterable} of all the entries in the given zip file.
-   */
-  // unmodifiable Iterator<? extends ZipEntry> can be safely cast
-  // to Iterator<ZipEntry>
-  @SuppressWarnings("unchecked")
-  static FluentIterable<ZipEntry> entries(final ZipFile file) {
-    checkNotNull(file);
-    return new FluentIterable<ZipEntry>() {
-      @Override
-      public Iterator<ZipEntry> iterator() {
-        return (Iterator<ZipEntry>) Iterators.forEnumeration(file.entries());
-      }
-    };
-  }
-
-  /**
-   * Unzips the zip file specified by the path and creates the directory structure <i>inside</i>
-   * the target directory. Refuses to unzip files that refer to a parent directory, for security
-   * reasons.
-   *
-   * @param zipFile the source zip-file to unzip
-   * @param targetDirectory the directory to unzip to. If the zip-file contains
-   *     any subdirectories, they will be created within our target directory.
-   * @throws IOException the unzipping failed, e.g. because the output was not writable, the {@code
-   *     zipFile} was not readable, or contains an illegal entry (contains "..", pointing outside
-   *     the target directory)
-   * @throws IllegalArgumentException the target directory is not a valid directory (e.g. does not
-   *     exist, or is a file instead of a directory)
-   */
-  static void unzipFile(
-      File zipFile,
-      File targetDirectory) throws IOException {
-    checkNotNull(zipFile);
-    checkNotNull(targetDirectory);
-    checkArgument(
-        targetDirectory.isDirectory(),
-        "%s is not a valid directory",
-        targetDirectory.getAbsolutePath());
-    final ZipFile zipFileObj = new ZipFile(zipFile);
-    try {
-      for (ZipEntry entry : entries(zipFileObj)) {
-        checkName(entry.getName());
-        File targetFile = new File(targetDirectory, entry.getName());
-        if (entry.isDirectory()) {
-          if (!targetFile.isDirectory() && !targetFile.mkdirs()) {
-            throw new IOException(
-                "Failed to create directory: " + targetFile.getAbsolutePath());
-          }
-        } else {
-          File parentFile = targetFile.getParentFile();
-          if (!parentFile.isDirectory()) {
-            if (!parentFile.mkdirs()) {
-              throw new IOException(
-                  "Failed to create directory: "
-                  + parentFile.getAbsolutePath());
-            }
-          }
-          // Write the file to the destination.
-          asByteSource(zipFileObj, entry).copyTo(Files.asByteSink(targetFile));
-        }
-      }
-    } finally {
-      zipFileObj.close();
-    }
-  }
-
-  /**
-   * Checks that the given entry name is legal for unzipping: if it contains
-   * ".." as a name element, it could cause the entry to be unzipped outside
-   * the directory we're unzipping to.
-   *
-   * @throws IOException if the name is illegal
-   */
-  private static void checkName(String name) throws IOException {
-    // First just check whether the entry name string contains "..".
-    // This should weed out the the vast majority of entries, which will not
-    // contain "..".
-    if (name.contains("..")) {
-      // If the string does contain "..", break it down into its actual name
-      // elements to ensure it actually contains ".." as a name, not just a
-      // name like "foo..bar" or even "foo..", which should be fine.
-      File file = new File(name);
-      while (file != null) {
-        if (file.getName().equals("..")) {
-          throw new IOException("Cannot unzip file containing an entry with "
-              + "\"..\" in the name: " + name);
-        }
-        file = file.getParentFile();
-      }
-    }
-  }
-
-  /**
-   * Zips an entire directory specified by the path.
-   *
-   * @param sourceDirectory the directory to read from. This directory and all
-   *     subdirectories will be added to the zip-file. The path within the zip
-   *     file is relative to the directory given as parameter, not absolute.
-   * @param zipFile the zip-file to write to.
-   * @throws IOException the zipping failed, e.g. because the input was not
-   *     readable.
-   */
-  static void zipDirectory(
-      File sourceDirectory,
-      File zipFile) throws IOException {
-    checkNotNull(sourceDirectory);
-    checkNotNull(zipFile);
-    checkArgument(
-        sourceDirectory.isDirectory(),
-        "%s is not a valid directory",
-        sourceDirectory.getAbsolutePath());
-    checkArgument(
-        !zipFile.exists(),
-        "%s does already exist, files are not being overwritten",
-        zipFile.getAbsolutePath());
-    Closer closer = Closer.create();
-    try {
-      OutputStream outputStream = closer.register(new BufferedOutputStream(
-          new FileOutputStream(zipFile)));
-      zipDirectory(sourceDirectory, outputStream);
-    } catch (Throwable t) {
-      throw closer.rethrow(t);
-    } finally {
-      closer.close();
-    }
-  }
-
-  /**
-   * Zips an entire directory specified by the path.
-   *
-   * @param sourceDirectory the directory to read from. This directory and all
-   *     subdirectories will be added to the zip-file. The path within the zip
-   *     file is relative to the directory given as parameter, not absolute.
-   * @param outputStream the stream to write the zip-file to. This method does not close
-   *     outputStream.
-   * @throws IOException the zipping failed, e.g. because the input was not
-   *     readable.
-   */
-  static void zipDirectory(
-      File sourceDirectory,
-      OutputStream outputStream) throws IOException {
-    checkNotNull(sourceDirectory);
-    checkNotNull(outputStream);
-    checkArgument(
-        sourceDirectory.isDirectory(),
-        "%s is not a valid directory",
-        sourceDirectory.getAbsolutePath());
-    ZipOutputStream zos = new ZipOutputStream(outputStream);
-    for (File file : sourceDirectory.listFiles()) {
-      zipDirectoryInternal(file, "", zos);
-    }
-    zos.finish();
-  }
-
-  /**
-   * Private helper function for zipping files. This one goes recursively
-   * through the input directory and all of its subdirectories and adds the
-   * single zip entries.
-   *
-   * @param inputFile the file or directory to be added to the zip file
-   * @param directoryName the string-representation of the parent directory
-   *     name. Might be an empty name, or a name containing multiple directory
-   *     names separated by "/". The directory name must be a valid name
-   *     according to the file system limitations. The directory name should be
-   *     empty or should end in "/".
-   * @param zos the zipstream to write to
-   * @throws IOException the zipping failed, e.g. because the output was not
-   *     writeable.
-   */
-  private static void zipDirectoryInternal(
-      File inputFile,
-      String directoryName,
-      ZipOutputStream zos) throws IOException {
-    String entryName = directoryName + inputFile.getName();
-    if (inputFile.isDirectory()) {
-      entryName += "/";
-
-      // We are hitting a sub-directory. Recursively add children to zip in deterministic,
-      // sorted order.
-      File[] childFiles = inputFile.listFiles();
-      if (childFiles.length > 0) {
-        Arrays.sort(childFiles);
-        // loop through the directory content, and zip the files
-        for (File file : childFiles) {
-          zipDirectoryInternal(file, entryName, zos);
-        }
-
-        // Since this directory has children, exit now without creating a zipentry specific to
-        // this directory. The entry for a non-entry directory is incompatible with certain
-        // implementations of unzip.
-        return;
-      }
-    }
-
-    // Put the zip-entry for this file or empty directory into the zipoutputstream.
-    ZipEntry entry = new ZipEntry(entryName);
-    entry.setTime(inputFile.lastModified());
-    zos.putNextEntry(entry);
-
-    // Copy file contents into zipoutput stream.
-    if (inputFile.isFile()) {
-      Files.asByteSource(inputFile).copyTo(zos);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java
deleted file mode 100644
index 2c1985c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Counter.java
+++ /dev/null
@@ -1,1103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.AND;
-import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.MEAN;
-import static com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.OR;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.util.concurrent.AtomicDouble;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A Counter enables the aggregation of a stream of values over time.  The
- * cumulative aggregate value is updated as new values are added, or it can be
- * reset to a new value.  Multiple kinds of aggregation are supported depending
- * on the type of the counter.
- *
- * <p>Counters compare using value equality of their name, kind, and
- * cumulative value.  Equal counters should have equal toString()s.
- *
- * @param <T> the type of values aggregated by this counter
- */
-public abstract class Counter<T> {
-  /**
-   * Possible kinds of counter aggregation.
-   */
-  public static enum AggregationKind {
-
-    /**
-     * Computes the sum of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    SUM,
-
-    /**
-     * Computes the maximum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MAX,
-
-    /**
-     * Computes the minimum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MIN,
-
-    /**
-     * Computes the arithmetic mean of all added values.  Applicable to
-     * {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MEAN,
-
-    /**
-     * Computes boolean AND over all added values.
-     * Applicable only to {@link Boolean} values.
-     */
-    AND,
-
-    /**
-     * Computes boolean OR over all added values. Applicable only to
-     * {@link Boolean} values.
-     */
-    OR
-    // TODO: consider adding VECTOR_SUM, HISTOGRAM, KV_SET, PRODUCT, TOP.
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Integer}, values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   * This is a convenience wrapper over a
-   * {@link Counter} implementation that aggregates {@link Long} values. This is
-   * useful when the application handles (boxed) {@link Integer} values that
-   * are not readily convertible to the (boxed) {@link Long} values otherwise
-   * expected by the {@link Counter} implementation aggregating {@link Long}
-   * values.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Integer> ints(String name, AggregationKind kind) {
-    return new IntegerCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Long} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Long> longs(String name, AggregationKind kind) {
-    return new LongCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Double} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Double> doubles(String name, AggregationKind kind) {
-    return new DoubleCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Boolean} values
-   * according to the desired aggregation kind. The only supported aggregation
-   * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Boolean> booleans(String name, AggregationKind kind) {
-    return new BooleanCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link String} values
-   * according to the desired aggregation kind. The only supported aggregation
-   * kind is {@link AggregationKind#MIN} and {@link AggregationKind#MAX}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  @SuppressWarnings("unused")
-  private static Counter<String> strings(String name, AggregationKind kind) {
-    return new StringCounter(name, kind);
-  }
-
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Adds a new value to the aggregation stream. Returns this (to allow method
-   * chaining).
-   */
-  public abstract Counter<T> addValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. This aggregator must not
-   * be a MEAN aggregator. Returns this (to allow method chaining).
-   */
-  public abstract Counter<T> resetToValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. Returns this (to allow
-   * method chaining). The value of elementCount must be non-negative, and this
-   * aggregator must be a MEAN aggregator.
-   */
-  public abstract Counter<T> resetMeanToValue(long elementCount, T value);
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset.
-   *
-   * @return the aggregate delta at the time this method is called
-   */
-  public abstract T getAndResetDelta();
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset, for a MEAN counter.
-   *
-   * @return the mean delta t the time this method is called
-   */
-  public abstract CounterMean<T> getAndResetMeanDelta();
-
-  /**
-   * Returns the counter's name.
-   */
-  public String getName() {
-    return name;
-  }
-
-  /**
-   * Returns the counter's aggregation kind.
-   */
-  public AggregationKind getKind() {
-    return kind;
-  }
-
-  /**
-   * Returns the counter's type.
-   */
-  public Class<?> getType() {
-    return new TypeDescriptor<T>(getClass()) {}.getRawType();
-  }
-
-  /**
-   * Returns the aggregated value, or the sum for MEAN aggregation, either
-   * total or, if delta, since the last update extraction or resetDelta.
-   */
-  public abstract T getAggregate();
-
-  /**
-   * The mean value of a {@code Counter}, represented as an aggregate value and
-   * a count.
-   *
-   * @param <T> the type of the aggregate
-   */
-  public static interface CounterMean<T> {
-    /**
-     * Gets the aggregate value of this {@code CounterMean}.
-     */
-    T getAggregate();
-
-    /**
-     * Gets the count of this {@code CounterMean}.
-     */
-    long getCount();
-  }
-
-  /**
-   * Returns the mean in the form of a CounterMean, or null if this is not a
-   * MEAN counter.
-   */
-  @Nullable
-  public abstract CounterMean<T> getMean();
-
-  /**
-   * Returns a string representation of the Counter. Useful for debugging logs.
-   * Example return value: "ElementCount:SUM(15)".
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getName());
-    sb.append(":");
-    sb.append(getKind());
-    sb.append("(");
-    switch (kind) {
-      case SUM:
-      case MAX:
-      case MIN:
-      case AND:
-      case OR:
-        sb.append(getAggregate());
-        break;
-      case MEAN:
-        sb.append(getMean());
-        break;
-      default:
-        throw illegalArgumentException();
-    }
-    sb.append(")");
-
-    return sb.toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (o instanceof Counter) {
-      Counter<?> that = (Counter<?>) o;
-      if (this.name.equals(that.name) && this.kind == that.kind
-          && this.getClass().equals(that.getClass())) {
-        if (kind == MEAN) {
-          CounterMean<T> thisMean = this.getMean();
-          CounterMean<?> thatMean = that.getMean();
-          return thisMean == thatMean
-              || (Objects.equals(thisMean.getAggregate(), thatMean.getAggregate())
-                     && thisMean.getCount() == thatMean.getCount());
-        } else {
-          return Objects.equals(this.getAggregate(), that.getAggregate());
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    if (kind == MEAN) {
-      CounterMean<T> mean = getMean();
-      return Objects.hash(getClass(), name, kind, mean.getAggregate(), mean.getCount());
-    } else {
-      return Objects.hash(getClass(), name, kind, getAggregate());
-    }
-  }
-
-  /**
-   * Returns whether this Counter is compatible with that Counter.  If
-   * so, they can be merged into a single Counter.
-   */
-  public boolean isCompatibleWith(Counter<?> that) {
-    return this.name.equals(that.name)
-        && this.kind == that.kind
-        && this.getClass().equals(that.getClass());
-  }
-
-  /**
-   * Merges this counter with the provided counter, returning this counter with the combined value
-   * of both counters. This may reset the delta of this counter.
-   *
-   * @throws IllegalArgumentException if the provided Counter is not compatible with this Counter
-   */
-  public abstract Counter<T> merge(Counter<T> that);
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** The name of this counter. */
-  protected final String name;
-
-  /** The kind of aggregation function to apply to this counter. */
-  protected final AggregationKind kind;
-
-  protected Counter(String name, AggregationKind kind) {
-    this.name = name;
-    this.kind = kind;
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Implements a {@link Counter} for {@link Long} values.
-   */
-  private static class LongCounter extends Counter<Long> {
-    private final AtomicLong aggregate;
-    private final AtomicLong deltaAggregate;
-    private final AtomicReference<LongCounterMean> mean;
-    private final AtomicReference<LongCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Long} values. */
-    private LongCounter(String name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          aggregate = deltaAggregate = null;
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          aggregate = new AtomicLong();
-          deltaAggregate = new AtomicLong();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          mean = deltaMean = null;
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public LongCounter addValue(Long value) {
-      switch (kind) {
-        case SUM:
-          aggregate.addAndGet(value);
-          deltaAggregate.addAndGet(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-      return this;
-    }
-
-    private void minAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void addToMeanAndSet(Long value, AtomicReference<LongCounterMean> target) {
-      LongCounterMean current;
-      LongCounterMean update;
-      do {
-        current = target.get();
-        update = new LongCounterMean(current.getAggregate() + value, current.getCount() + 1L);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Long getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    public Long getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0L);
-        case MAX:
-          return deltaAggregate.getAndSet(Long.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Long.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Long> resetToValue(Long value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
-      }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
-    }
-
-    @Override
-    public Counter<Long> resetMeanToValue(long elementCount, Long value) {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be non-negative");
-      }
-      LongCounterMean counterMean = new LongCounterMean(value, elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
-    }
-
-    @Override
-    public CounterMean<Long> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new LongCounterMean(0L, 0L));
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Long> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Long> merge(Counter<Long> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Long> thisCounterMean = this.getMean();
-          CounterMean<Long> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    private static class LongCounterMean implements CounterMean<Long> {
-      private final long aggregate;
-      private final long count;
-
-      public LongCounterMean(long aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Long getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Double} values.
-   */
-  private static class DoubleCounter extends Counter<Double> {
-    AtomicDouble aggregate;
-    AtomicDouble deltaAggregate;
-    AtomicReference<DoubleCounterMean> mean;
-    AtomicReference<DoubleCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Double} values. */
-    private DoubleCounter(String name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicDouble();
-          deltaAggregate = new AtomicDouble();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public DoubleCounter addValue(Double value) {
-      switch (kind) {
-        case SUM:
-          aggregate.addAndGet(value);
-          deltaAggregate.addAndGet(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-      return this;
-    }
-
-    private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) {
-      DoubleCounterMean current;
-      DoubleCounterMean update;
-      do {
-        current = target.get();
-        update = new DoubleCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.max(current, value);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.min(current, value);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Double getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0.0);
-        case MAX:
-          return deltaAggregate.getAndSet(Double.NEGATIVE_INFINITY);
-        case MIN:
-          return deltaAggregate.getAndSet(Double.POSITIVE_INFINITY);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Double> resetToValue(Double value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
-      }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
-    }
-
-    @Override
-    public Counter<Double> resetMeanToValue(long elementCount, Double value) {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be non-negative");
-      }
-      DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
-    }
-
-    @Override
-    public CounterMean<Double> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new DoubleCounterMean(0.0, 0L));
-    }
-
-    @Override
-    public Double getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Double> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Double> merge(Counter<Double> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Double> thisCounterMean = this.getMean();
-          CounterMean<Double> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    private static class DoubleCounterMean implements CounterMean<Double> {
-      private final double aggregate;
-      private final long count;
-
-      public DoubleCounterMean(double aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Double getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Boolean} values.
-   */
-  private static class BooleanCounter extends Counter<Boolean> {
-    private final AtomicBoolean aggregate;
-    private final AtomicBoolean deltaAggregate;
-
-    /** Initializes a new {@link Counter} for {@link Boolean} values. */
-    private BooleanCounter(String name, AggregationKind kind) {
-      super(name, kind);
-      aggregate = new AtomicBoolean();
-      deltaAggregate = new AtomicBoolean();
-      getAndResetDelta();
-      aggregate.set(deltaAggregate.get());
-    }
-
-    @Override
-    public BooleanCounter addValue(Boolean value) {
-      if (kind.equals(AND) && !value) {
-        aggregate.set(value);
-        deltaAggregate.set(value);
-      } else if (kind.equals(OR) && value) {
-        aggregate.set(value);
-        deltaAggregate.set(value);
-      }
-      return this;
-    }
-
-    @Override
-    public Boolean getAndResetDelta() {
-      switch (kind) {
-        case AND:
-          return deltaAggregate.getAndSet(true);
-        case OR:
-          return deltaAggregate.getAndSet(false);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Boolean> resetToValue(Boolean value) {
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
-    }
-
-    @Override
-    public Counter<Boolean> resetMeanToValue(long elementCount, Boolean value) {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public CounterMean<Boolean> getAndResetMeanDelta() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Boolean getAggregate() {
-      return aggregate.get();
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Boolean> getMean() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Counter<Boolean> merge(Counter<Boolean> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      return addValue(that.getAggregate());
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link String} values.
-   */
-  private static class StringCounter extends Counter<String> {
-    /** Initializes a new {@link Counter} for {@link String} values. */
-    private StringCounter(String name, AggregationKind kind) {
-      super(name, kind);
-      // TODO: Support MIN, MAX of Strings.
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public StringCounter addValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetToValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetMeanToValue(long elementCount, String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAndResetDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public CounterMean<String> getAndResetMeanDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAggregate() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<String> getMean() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> merge(Counter<String> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Integer} values.
-   */
-  private static class IntegerCounter extends Counter<Integer> {
-    private final AtomicInteger aggregate;
-    private final AtomicInteger deltaAggregate;
-    private final AtomicReference<IntegerCounterMean> mean;
-    private final AtomicReference<IntegerCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Integer} values. */
-    private IntegerCounter(String name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicInteger();
-          deltaAggregate = new AtomicInteger();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public IntegerCounter addValue(Integer value) {
-      switch (kind) {
-        case SUM:
-          aggregate.getAndAdd(value);
-          deltaAggregate.getAndAdd(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-      return this;
-    }
-
-    private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) {
-      IntegerCounterMean current;
-      IntegerCounterMean update;
-      do {
-        current = target.get();
-        update = new IntegerCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Integer getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0);
-        case MAX:
-          return deltaAggregate.getAndSet(Integer.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Integer.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Integer> resetToValue(Integer value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
-      }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
-    }
-
-    @Override
-    public Counter<Integer> resetMeanToValue(long elementCount, Integer value) {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be non-negative");
-      }
-      IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
-    }
-
-    @Override
-    public CounterMean<Integer> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new IntegerCounterMean(0, 0L));
-    }
-
-    @Override
-    public Integer getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Integer> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Integer> merge(Counter<Integer> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Integer> thisCounterMean = this.getMean();
-          CounterMean<Integer> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    private static class IntegerCounterMean implements CounterMean<Integer> {
-      private final int aggregate;
-      private final long count;
-
-      public IntegerCounterMean(int aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Integer getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Constructs an {@link IllegalArgumentException} explaining that this
-   * {@link Counter}'s aggregation kind is not supported by its value type.
-   */
-  protected IllegalArgumentException illegalArgumentException() {
-    return new IllegalArgumentException("Cannot compute " + kind
-        + " aggregation over " + getType().getSimpleName() + " values.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterProvider.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterProvider.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterProvider.java
deleted file mode 100644
index ba53f80..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterProvider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-/**
- * A counter provider can provide {@link Counter} instances.
- *
- * @param <T> the input type of the counter.
- */
-public interface CounterProvider<T> {
-  Counter<T> getCounter(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterSet.java
deleted file mode 100644
index 9e9638f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/CounterSet.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.AbstractSet;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * A CounterSet maintains a set of {@link Counter}s.
- *
- * <p>Thread-safe.
- */
-public class CounterSet extends AbstractSet<Counter<?>> {
-
-  /** Registered counters. */
-  private final HashMap<String, Counter<?>> counters = new HashMap<>();
-
-  private final AddCounterMutator addCounterMutator = new AddCounterMutator();
-
-  /**
-   * Constructs a CounterSet containing the given Counters.
-   */
-  public CounterSet(Counter<?>... counters) {
-    for (Counter<?> counter : counters) {
-      addNewCounter(counter);
-    }
-  }
-
-  /**
-   * Returns an object that supports adding additional counters into
-   * this CounterSet.
-   */
-  public AddCounterMutator getAddCounterMutator() {
-    return addCounterMutator;
-  }
-
-  /**
-   * Adds a new counter, throwing an exception if a counter of the
-   * same name already exists.
-   */
-  public void addNewCounter(Counter<?> counter) {
-    if (!addCounter(counter)) {
-      throw new IllegalArgumentException(
-          "Counter " + counter + " duplicates an existing counter in " + this);
-    }
-  }
-
-  /**
-   * Adds the given Counter to this CounterSet.
-   *
-   * <p>If a counter with the same name already exists, it will be
-   * reused, as long as it is compatible.
-   *
-   * @return the Counter that was reused, or added
-   * @throws IllegalArgumentException if a counter with the same
-   * name but an incompatible kind had already been added
-   */
-  public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
-    Counter<?> oldCounter = counters.get(counter.getName());
-    if (oldCounter == null) {
-      // A new counter.
-      counters.put(counter.getName(), counter);
-      return counter;
-    }
-    if (counter.isCompatibleWith(oldCounter)) {
-      // Return the counter to reuse.
-      @SuppressWarnings("unchecked")
-      Counter<T> compatibleCounter = (Counter<T>) oldCounter;
-      return compatibleCounter;
-    }
-    throw new IllegalArgumentException(
-        "Counter " + counter + " duplicates incompatible counter "
-        + oldCounter + " in " + this);
-  }
-
-  /**
-   * Adds a counter. Returns {@code true} if the counter was added to the set
-   * and false if the given counter was {@code null} or it already existed in
-   * the set.
-   *
-   * @param counter to register
-   */
-  public boolean addCounter(Counter<?> counter) {
-    return add(counter);
-  }
-
-  /**
-   * Returns the Counter with the given name in this CounterSet;
-   * returns null if no such Counter exists.
-   */
-  public synchronized Counter<?> getExistingCounter(String name) {
-    return counters.get(name);
-  }
-
-  @Override
-  public synchronized Iterator<Counter<?>> iterator() {
-    return counters.values().iterator();
-  }
-
-  @Override
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  @Override
-  public synchronized boolean add(Counter<?> e) {
-    if (null == e) {
-      return false;
-    }
-    if (counters.containsKey(e.getName())) {
-      return false;
-    }
-    counters.put(e.getName(), e);
-    return true;
-  }
-
-  public synchronized void merge(CounterSet that) {
-    for (Counter<?> theirCounter : that) {
-      Counter<?> myCounter = counters.get(theirCounter.getName());
-      if (myCounter != null) {
-        mergeCounters(myCounter, theirCounter);
-      } else {
-        addCounter(theirCounter);
-      }
-    }
-  }
-
-  private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) {
-    checkArgument(
-        mine.isCompatibleWith(theirCounter),
-        "Can't merge CounterSets containing incompatible counters with the same name: "
-            + "%s (existing) and %s (merged)",
-        mine,
-        theirCounter);
-    @SuppressWarnings("unchecked")
-    Counter<T> theirs = (Counter<T>) theirCounter;
-    mine.merge(theirs);
-  }
-
-  /**
-   * A nested class that supports adding additional counters into the
-   * enclosing CounterSet. This is useful as a mutator, hiding other
-   * public methods of the CounterSet.
-   */
-  public class AddCounterMutator {
-    /**
-     * Adds the given Counter into the enclosing CounterSet.
-     *
-     * <p>If a counter with the same name already exists, it will be
-     * reused, as long as it has the same type.
-     *
-     * @return the Counter that was reused, or added
-     * @throws IllegalArgumentException if a counter with the same
-     * name but an incompatible kind had already been added
-     */
-    public <T> Counter<T> addCounter(Counter<T> counter) {
-      return addOrReuseCounter(counter);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservable.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservable.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservable.java
deleted file mode 100644
index fee6737..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-/**
- * An interface for things that allow observing the size in bytes of
- * encoded values of type {@code T}.
- *
- * @param <T> the type of the values being observed
- */
-public interface ElementByteSizeObservable<T> {
-  /**
-   * Returns whether {@link #registerByteSizeObserver} is cheap enough
-   * to call for every element, that is, if this
-   * {@code ElementByteSizeObservable} can calculate the byte size of
-   * the element to be coded in roughly constant time (or lazily).
-   */
-  public boolean isRegisterByteSizeObserverCheap(T value);
-
-  /**
-   * Notifies the {@code ElementByteSizeObserver} about the byte size
-   * of the encoded value using this {@code ElementByteSizeObservable}.
-   */
-  public void registerByteSizeObserver(T value,
-                                       ElementByteSizeObserver observer)
-      throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterable.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterable.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterable.java
deleted file mode 100644
index 591d2be..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Observer;
-
-/**
- * An abstract class used for iterables that notify observers about size in
- * bytes of their elements, as they are being iterated over.
- *
- * @param <V> the type of elements returned by this iterable
- * @param <InputT> type type of iterator returned by this iterable
- */
-public abstract class ElementByteSizeObservableIterable<
-    V, InputT extends ElementByteSizeObservableIterator<V>>
-    implements Iterable<V> {
-  private List<Observer> observers = new ArrayList<>();
-
-  /**
-   * Derived classes override this method to return an iterator for this
-   * iterable.
-   */
-  protected abstract InputT createIterator();
-
-  /**
-   * Sets the observer, which will observe the iterator returned in
-   * the next call to iterator() method. Future calls to iterator()
-   * won't be observed, unless an observer is set again.
-   */
-  public void addObserver(Observer observer) {
-    observers.add(observer);
-  }
-
-  /**
-   * Returns a new iterator for this iterable. If an observer was set in
-   * a previous call to setObserver(), it will observe the iterator returned.
-   */
-  @Override
-  public InputT iterator() {
-    InputT iterator = createIterator();
-    for (Observer observer : observers) {
-      iterator.addObserver(observer);
-    }
-    observers.clear();
-    return iterator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterator.java
deleted file mode 100644
index c094900..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObservableIterator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import java.util.Iterator;
-import java.util.Observable;
-
-/**
- * An abstract class used for iterators that notify observers about size in
- * bytes of their elements, as they are being iterated over. The subclasses
- * need to implement the standard Iterator interface and call method
- * notifyValueReturned() for each element read and/or iterated over.
- *
- * @param <V> value type
- */
-public abstract class ElementByteSizeObservableIterator<V>
-    extends Observable implements Iterator<V> {
-  protected final void notifyValueReturned(long byteSize) {
-    setChanged();
-    notifyObservers(byteSize);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObserver.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObserver.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObserver.java
deleted file mode 100644
index 6c764d9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ElementByteSizeObserver.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import java.util.Observable;
-import java.util.Observer;
-
-/**
- * An observer that gets notified when additional bytes are read
- * and/or used. It adds all bytes into a local counter. When the
- * observer gets advanced via the next() call, it adds the total byte
- * count to the specified counter, and prepares for the next element.
- */
-public class ElementByteSizeObserver implements Observer {
-  private final Counter<Long> counter;
-  private boolean isLazy = false;
-  private long totalSize = 0;
-  private double scalingFactor = 1.0;
-
-  public ElementByteSizeObserver(Counter<Long> counter) {
-    this.counter = counter;
-  }
-
-  /**
-   * Sets byte counting for the current element as lazy. That is, the
-   * observer will get notified of the element's byte count only as
-   * element's pieces are being processed or iterated over.
-   */
-  public void setLazy() {
-    isLazy = true;
-  }
-
-  /**
-   * Returns whether byte counting for the current element is lazy, that is,
-   * whether the observer gets notified of the element's byte count only as
-   * element's pieces are being processed or iterated over.
-   */
-  public boolean getIsLazy() {
-    return isLazy;
-  }
-
-  /**
-   * Updates the observer with a context specified, but without an instance of
-   * the Observable.
-   */
-  public void update(Object obj) {
-    update(null, obj);
-  }
-
-  /**
-   * Sets a multiplier to use on observed sizes.
-   */
-  public void setScalingFactor(double scalingFactor) {
-    this.scalingFactor = scalingFactor;
-  }
-
-  @Override
-  public void update(Observable obs, Object obj) {
-    if (obj instanceof Long) {
-      totalSize += scalingFactor * (Long) obj;
-    } else if (obj instanceof Integer) {
-      totalSize += scalingFactor * (Integer) obj;
-    } else {
-      throw new AssertionError("unexpected parameter object");
-    }
-  }
-
-  /**
-   * Advances the observer to the next element. Adds the current total byte
-   * size to the counter, and prepares the observer for the next element.
-   */
-  public void advance() {
-    counter.addValue(totalSize);
-
-    totalSize = 0;
-    isLazy = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/PeekingReiterator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/PeekingReiterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/PeekingReiterator.java
deleted file mode 100644
index 0948747..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/PeekingReiterator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.NoSuchElementException;
-
-/**
- * A {@link Reiterator} that supports one-element lookahead during iteration.
- *
- * @param <T> the type of elements returned by this iterator
- */
-public final class PeekingReiterator<T> implements Reiterator<T> {
-  private T nextElement;
-  private boolean nextElementComputed;
-  private final Reiterator<T> iterator;
-
-  public PeekingReiterator(Reiterator<T> iterator) {
-    this.iterator = checkNotNull(iterator);
-  }
-
-  PeekingReiterator(PeekingReiterator<T> it) {
-    this.iterator = checkNotNull(checkNotNull(it).iterator.copy());
-    this.nextElement = it.nextElement;
-    this.nextElementComputed = it.nextElementComputed;
-  }
-
-  @Override
-  public boolean hasNext() {
-    computeNext();
-    return nextElementComputed;
-  }
-
-  @Override
-  public T next() {
-    T result = peek();
-    nextElementComputed = false;
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>If {@link #peek} is called, {@code remove} is disallowed until
-   * {@link #next} has been subsequently called.
-   */
-  @Override
-  public void remove() {
-    checkState(!nextElementComputed,
-        "After peek(), remove() is disallowed until next() is called");
-    iterator.remove();
-  }
-
-  @Override
-  public PeekingReiterator<T> copy() {
-    return new PeekingReiterator<>(this);
-  }
-
-  /**
-   * Returns the element that would be returned by {@link #next}, without
-   * actually consuming the element.
-   * @throws NoSuchElementException if there is no next element
-   */
-  public T peek() {
-    computeNext();
-    if (!nextElementComputed) {
-      throw new NoSuchElementException();
-    }
-    return nextElement;
-  }
-
-  private void computeNext() {
-    if (nextElementComputed) {
-      return;
-    }
-    if (!iterator.hasNext()) {
-      return;
-    }
-    nextElement = iterator.next();
-    nextElementComputed = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ReflectHelpers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ReflectHelpers.java
deleted file mode 100644
index f87242f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/ReflectHelpers.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util.common;
-
-import static java.util.Arrays.asList;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Queues;
-
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.WildcardType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Queue;
-
-import javax.annotation.Nullable;
-
-/**
- * Utilities for working with with {@link Class Classes} and {@link Method Methods}.
- */
-public class ReflectHelpers {
-
-  private static final Joiner COMMA_SEPARATOR = Joiner.on(", ");
-
-  /** A {@link Function} that turns a method into a simple method signature. */
-  public static final Function<Method, String> METHOD_FORMATTER = new Function<Method, String>() {
-    @Override
-    public String apply(Method input) {
-      String parameterTypes = FluentIterable.from(asList(input.getParameterTypes()))
-          .transform(CLASS_SIMPLE_NAME)
-          .join(COMMA_SEPARATOR);
-      return String.format("%s(%s)",
-          input.getName(),
-          parameterTypes);
-    }
-  };
-
-  /** A {@link Function} that turns a method into the declaring class + method signature. */
-  public static final Function<Method, String> CLASS_AND_METHOD_FORMATTER =
-      new Function<Method, String>() {
-    @Override
-    public String apply(Method input) {
-      return String.format("%s#%s",
-          CLASS_NAME.apply(input.getDeclaringClass()),
-          METHOD_FORMATTER.apply(input));
-    }
-  };
-
-  /** A {@link Function} with returns the classes name. */
-  public static final Function<Class<?>, String> CLASS_NAME =
-      new Function<Class<?>, String>() {
-    @Override
-    public String apply(Class<?> input) {
-      return input.getName();
-    }
-  };
-
-  /** A {@link Function} with returns the classes name. */
-  public static final Function<Class<?>, String> CLASS_SIMPLE_NAME =
-      new Function<Class<?>, String>() {
-    @Override
-    public String apply(Class<?> input) {
-      return input.getSimpleName();
-    }
-  };
-
-  /** A {@link Function} that formats types. */
-  public static final Function<Type, String> TYPE_SIMPLE_DESCRIPTION =
-      new Function<Type, String>() {
-    @Override
-    @Nullable
-    public String apply(@Nullable Type input) {
-      StringBuilder builder = new StringBuilder();
-      format(builder, input);
-      return builder.toString();
-    }
-
-    private void format(StringBuilder builder, Type t) {
-      if (t instanceof Class) {
-        formatClass(builder, (Class<?>) t);
-      } else if (t instanceof TypeVariable) {
-        formatTypeVariable(builder, (TypeVariable<?>) t);
-      } else if (t instanceof WildcardType) {
-        formatWildcardType(builder, (WildcardType) t);
-      } else if (t instanceof ParameterizedType) {
-        formatParameterizedType(builder, (ParameterizedType) t);
-      } else if (t instanceof GenericArrayType) {
-        formatGenericArrayType(builder, (GenericArrayType) t);
-      } else {
-        builder.append(t.toString());
-      }
-    }
-
-    private void formatClass(StringBuilder builder, Class<?> clazz) {
-      builder.append(clazz.getSimpleName());
-    }
-
-    private void formatTypeVariable(StringBuilder builder, TypeVariable<?> t) {
-      builder.append(t.getName());
-    }
-
-    private void formatWildcardType(StringBuilder builder, WildcardType t) {
-      builder.append("?");
-      for (Type lowerBound : t.getLowerBounds()) {
-        builder.append(" super ");
-        format(builder, lowerBound);
-      }
-      for (Type upperBound : t.getUpperBounds()) {
-        if (!Object.class.equals(upperBound)) {
-          builder.append(" extends ");
-          format(builder, upperBound);
-        }
-      }
-    }
-
-    private void formatParameterizedType(StringBuilder builder, ParameterizedType t) {
-      format(builder, t.getRawType());
-      builder.append('<');
-      COMMA_SEPARATOR.appendTo(builder,
-          FluentIterable.from(asList(t.getActualTypeArguments()))
-          .transform(TYPE_SIMPLE_DESCRIPTION));
-      builder.append('>');
-    }
-
-    private void formatGenericArrayType(StringBuilder builder, GenericArrayType t) {
-      format(builder, t.getGenericComponentType());
-      builder.append("[]");
-    }
-  };
-
-  /**
-   * Returns all interfaces of the given clazz.
-   * @param clazz
-   * @return
-   */
-  public static FluentIterable<Class<?>> getClosureOfInterfaces(Class<?> clazz) {
-    Preconditions.checkNotNull(clazz);
-    Queue<Class<?>> interfacesToProcess = Queues.newArrayDeque();
-    Collections.addAll(interfacesToProcess, clazz.getInterfaces());
-
-    LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-    while (!interfacesToProcess.isEmpty()) {
-      Class<?> current = interfacesToProcess.remove();
-      if (interfaces.add(current)) {
-        Collections.addAll(interfacesToProcess, current.getInterfaces());
-      }
-    }
-    return FluentIterable.from(interfaces);
-  }
-
-  /**
-   * Returns all the methods visible from the provided interfaces.
-   *
-   * @param interfaces The interfaces to use when searching for all their methods.
-   * @return An iterable of {@link Method}s which interfaces expose.
-   */
-  public static Iterable<Method> getClosureOfMethodsOnInterfaces(
-      Iterable<? extends Class<?>> interfaces) {
-    return FluentIterable.from(interfaces).transformAndConcat(
-        new Function<Class<?>, Iterable<Method>>() {
-          @Override
-          public Iterable<Method> apply(Class<?> input) {
-            return getClosureOfMethodsOnInterface(input);
-          }
-    });
-  }
-
-  /**
-   * Returns all the methods visible from {@code iface}.
-   *
-   * @param iface The interface to use when searching for all its methods.
-   * @return An iterable of {@link Method}s which {@code iface} exposes.
-   */
-  public static Iterable<Method> getClosureOfMethodsOnInterface(Class<?> iface) {
-    Preconditions.checkNotNull(iface);
-    Preconditions.checkArgument(iface.isInterface());
-    ImmutableSet.Builder<Method> builder = ImmutableSet.builder();
-    Queue<Class<?>> interfacesToProcess = Queues.newArrayDeque();
-    interfacesToProcess.add(iface);
-    while (!interfacesToProcess.isEmpty()) {
-      Class<?> current = interfacesToProcess.remove();
-      builder.add(current.getMethods());
-      interfacesToProcess.addAll(Arrays.asList(current.getInterfaces()));
-    }
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterable.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterable.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterable.java
deleted file mode 100644
index 01c5775..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-/**
- * An {@link Iterable} that returns {@link Reiterator} iterators.
- *
- * @param <T> the type of elements returned by the iterator
- */
-public interface Reiterable<T> extends Iterable<T> {
-  @Override
-  public Reiterator<T> iterator();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterator.java
deleted file mode 100644
index dd8036d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/Reiterator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common;
-
-import java.util.Iterator;
-
-/**
- * An {@link Iterator} with the ability to copy its iteration state.
- *
- * @param <T> the type of elements returned by this iterator
- */
-public interface Reiterator<T> extends Iterator<T> {
-  /**
-   * Returns a copy of the current {@link Reiterator}.  The copy's iteration
-   * state is logically independent of the current iterator; each may be
-   * advanced without affecting the other.
-   *
-   * <p>The returned {@code Reiterator} is not guaranteed to return
-   * referentially identical iteration results as the original
-   * {@link Reiterator}, although {@link Object#equals} will typically return
-   * true for the corresponding elements of each if the original source is
-   * logically immutable.
-   */
-  public Reiterator<T> copy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/package-info.java
deleted file mode 100644
index 7fb16c5..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/** Defines utilities shared by multiple PipelineRunner implementations. **/
-package com.google.cloud.dataflow.sdk.util.common;


Mime
View raw message