beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [5/7] beam git commit: [BEAM-2135] Move gcp-core to google-cloud-platform-core
Date Tue, 02 May 2017 17:57:10 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
deleted file mode 100644
index c8e6839..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.model.Bucket;
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
-import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
-import com.google.cloud.hadoop.util.ClientRequestHelper;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.AccessDeniedException;
-import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides operations on GCS.
- */
-public class GcsUtil {
-  /**
-   * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using
-   * any transport flags specified on the {@link PipelineOptions}.
-   */
-  public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
-    /**
-     * Returns an instance of {@link GcsUtil} based on the
-     * {@link PipelineOptions}.
-     *
-     * <p>If no instance has previously been created, one is created and the value
-     * stored in {@code options}.
-     */
-    @Override
-    public GcsUtil create(PipelineOptions options) {
-      LOG.debug("Creating new GcsUtil");
-      GcsOptions gcsOptions = options.as(GcsOptions.class);
-      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
-      return new GcsUtil(
-          storageBuilder.build(),
-          storageBuilder.getHttpRequestInitializer(),
-          gcsOptions.getExecutorService(),
-          gcsOptions.getGcsUploadBufferSizeBytes());
-    }
-
-    /**
-     * Returns an instance of {@link GcsUtil} based on the given parameters.
-     */
-    public static GcsUtil create(
-        Storage storageClient,
-        HttpRequestInitializer httpRequestInitializer,
-        ExecutorService executorService,
-        @Nullable Integer uploadBufferSizeBytes) {
-      return new GcsUtil(
-          storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);
-
-  /** Maximum number of items to retrieve per Objects.List request. */
-  private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
-
-  /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
-  private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
-
-  private static final String RECURSIVE_WILDCARD = "[*]{2}";
-
-  /**
-   * A {@link Pattern} for globs with a recursive wildcard.
-   */
-  private static final Pattern RECURSIVE_GCS_PATTERN =
-      Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
-
-  /**
-   * Maximum number of requests permitted in a GCS batch request.
-   */
-  private static final int MAX_REQUESTS_PER_BATCH = 100;
-  /**
-   * Maximum number of concurrent batches of requests executing on GCS.
-   */
-  private static final int MAX_CONCURRENT_BATCHES = 256;
-
-  private static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Client for the GCS API. */
-  private Storage storageClient;
-  private final HttpRequestInitializer httpRequestInitializer;
-  /** Buffer size for GCS uploads (in bytes). */
-  @Nullable private final Integer uploadBufferSizeBytes;
-
-  // Helper delegate for turning IOExceptions from API calls into higher-level semantics.
-  private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-
-  // Exposed for testing.
-  final ExecutorService executorService;
-
-  /**
-   * Returns true if the given GCS pattern is supported otherwise fails with an
-   * exception.
-   */
-  public static boolean isGcsPatternSupported(String gcsPattern) {
-    if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
-      throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": "
-          + " recursive wildcards are not supported.");
-    }
-    return true;
-  }
-
-  /**
-   * Returns the prefix portion of the glob that doesn't contain wildcards.
-   */
-  public static String getGlobPrefix(String globExp) {
-    checkArgument(isGcsPatternSupported(globExp));
-    Matcher m = GLOB_PREFIX.matcher(globExp);
-    checkArgument(
-        m.matches(),
-        String.format("Glob expression: [%s] is not expandable.", globExp));
-    return m.group("PREFIX");
-  }
-
-  /**
-   * Expands glob expressions to regular expressions.
-   *
-   * @param globExp the glob expression to expand
-   * @return a string with the regular expression this glob expands to
-   */
-  public static String globToRegexp(String globExp) {
-    StringBuilder dst = new StringBuilder();
-    char[] src = globExp.toCharArray();
-    int i = 0;
-    while (i < src.length) {
-      char c = src[i++];
-      switch (c) {
-        case '*':
-          dst.append("[^/]*");
-          break;
-        case '?':
-          dst.append("[^/]");
-          break;
-        case '.':
-        case '+':
-        case '{':
-        case '}':
-        case '(':
-        case ')':
-        case '|':
-        case '^':
-        case '$':
-          // These need to be escaped in regular expressions
-          dst.append('\\').append(c);
-          break;
-        case '\\':
-          i = doubleSlashes(dst, src, i);
-          break;
-        default:
-          dst.append(c);
-          break;
-      }
-    }
-    return dst.toString();
-  }
-
-  /**
-   * Returns true if the given {@code spec} contains glob.
-   */
-  public static boolean isGlob(GcsPath spec) {
-    return GLOB_PREFIX.matcher(spec.getObject()).matches();
-  }
-
-  private GcsUtil(
-      Storage storageClient,
-      HttpRequestInitializer httpRequestInitializer,
-      ExecutorService executorService,
-      @Nullable Integer uploadBufferSizeBytes) {
-    this.storageClient = storageClient;
-    this.httpRequestInitializer = httpRequestInitializer;
-    this.uploadBufferSizeBytes = uploadBufferSizeBytes;
-    this.executorService = executorService;
-  }
-
-  // Use this only for testing purposes.
-  protected void setStorageClient(Storage storageClient) {
-    this.storageClient = storageClient;
-  }
-
-  /**
-   * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded
-   * in the result. For patterns that only match a single object, we ensure that the object
-   * exists.
-   */
-  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
-    checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
-    Pattern p = null;
-    String prefix = null;
-    if (!isGlob(gcsPattern)) {
-      // Not a glob.
-      try {
-        // Use a get request to fetch the metadata of the object, and ignore the return value.
-        // The request has strong global consistency.
-        getObject(gcsPattern);
-        return ImmutableList.of(gcsPattern);
-      } catch (FileNotFoundException e) {
-        // If the path was not found, return an empty list.
-        return ImmutableList.of();
-      }
-    } else {
-      // Part before the first wildcard character.
-      prefix = getGlobPrefix(gcsPattern.getObject());
-      p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
-    }
-
-    LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
-        prefix, p.toString());
-
-    String pageToken = null;
-    List<GcsPath> results = new LinkedList<>();
-    do {
-      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
-      if (objects.getItems() == null) {
-        break;
-      }
-
-      // Filter objects based on the regex.
-      for (StorageObject o : objects.getItems()) {
-        String name = o.getName();
-        // Skip directories, which end with a slash.
-        if (p.matcher(name).matches() && !name.endsWith("/")) {
-          LOG.debug("Matched object: {}", name);
-          results.add(GcsPath.fromObject(o));
-        }
-      }
-      pageToken = objects.getNextPageToken();
-    } while (pageToken != null);
-
-    return results;
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Integer getUploadBufferSizeBytes() {
-    return uploadBufferSizeBytes;
-  }
-
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException}
-   * if the resource does not exist.
-   */
-  public long fileSize(GcsPath path) throws IOException {
-    return getObject(path).getSize().longValue();
-  }
-
-  /**
-   * Returns the {@link StorageObject} for the given {@link GcsPath}.
-   */
-  public StorageObject getObject(GcsPath gcsPath) throws IOException {
-    return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
-  }
-
-  @VisibleForTesting
-  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException {
-    Storage.Objects.Get getObject =
-        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
-    try {
-      return ResilientOperation.retry(
-          ResilientOperation.getGoogleRequestCallable(getObject),
-          backoff,
-          RetryDeterminer.SOCKET_ERRORS,
-          IOException.class,
-          sleeper);
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
-        throw new FileNotFoundException(gcsPath.toString());
-      }
-      throw new IOException(
-          String.format("Unable to get the file object for path %s.", gcsPath),
-          e);
-    }
-  }
-
-  /**
-   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given
-   * {@link GcsPath GcsPaths}.
-   */
-  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
-      throws IOException {
-    List<StorageObjectOrIOException[]> results = new ArrayList<>();
-    executeBatches(makeGetBatches(gcsPaths, results));
-    ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder();
-    for (StorageObjectOrIOException[] result : results) {
-      ret.add(result[0]);
-    }
-    return ret.build();
-  }
-
-  /**
-   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}.
-   */
-  public Objects listObjects(String bucket, String prefix, @Nullable String pageToken)
-      throws IOException {
-    // List all objects that start with the prefix (including objects in sub-directories).
-    Storage.Objects.List listObject = storageClient.objects().list(bucket);
-    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
-    listObject.setPrefix(prefix);
-
-    if (pageToken != null) {
-      listObject.setPageToken(pageToken);
-    }
-
-    try {
-      return ResilientOperation.retry(
-          ResilientOperation.getGoogleRequestCallable(listObject),
-          BACKOFF_FACTORY.backoff(),
-          RetryDeterminer.SOCKET_ERRORS,
-          IOException.class);
-    } catch (Exception e) {
-      throw new IOException(
-          String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix),
-          e);
-    }
-  }
-
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException}
-   * if the resource does not exist.
-   */
-  @VisibleForTesting
-  List<Long> fileSizes(List<GcsPath> paths) throws IOException {
-    List<StorageObjectOrIOException> results = getObjects(paths);
-
-    ImmutableList.Builder<Long> ret = ImmutableList.builder();
-    for (StorageObjectOrIOException result : results) {
-      ret.add(toFileSize(result));
-    }
-    return ret.build();
-  }
-
-  private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException)
-      throws IOException {
-    if (storageObjectOrIOException.ioException() != null) {
-      throw storageObjectOrIOException.ioException();
-    } else {
-      return storageObjectOrIOException.storageObject().getSize().longValue();
-    }
-  }
-
-  /**
-   * Opens an object in GCS.
-   *
-   * <p>Returns a SeekableByteChannel that provides access to data in the bucket.
-   *
-   * @param path the GCS filename to read from
-   * @return a SeekableByteChannel that can read the object data
-   */
-  public SeekableByteChannel open(GcsPath path)
-      throws IOException {
-    return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(),
-            path.getObject(), errorExtractor,
-            new ClientRequestHelper<StorageObject>());
-  }
-
-  /**
-   * Creates an object in GCS.
-   *
-   * <p>Returns a WritableByteChannel that can be used to write data to the
-   * object.
-   *
-   * @param path the GCS file to write to
-   * @param type the type of object, eg "text/plain".
-   * @return a Callable object that encloses the operation.
-   */
-  public WritableByteChannel create(GcsPath path,
-      String type) throws IOException {
-    GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel(
-        executorService,
-        storageClient,
-        new ClientRequestHelper<StorageObject>(),
-        path.getBucket(),
-        path.getObject(),
-        AsyncWriteChannelOptions.newBuilder().build(),
-        new ObjectWriteConditions(),
-        Collections.<String, String>emptyMap(),
-        type);
-    if (uploadBufferSizeBytes != null) {
-      channel.setUploadBufferSize(uploadBufferSizeBytes);
-    }
-    channel.initialize();
-    return channel;
-  }
-
-  /**
-   * Returns whether the GCS bucket exists and is accessible.
-   */
-  public boolean bucketAccessible(GcsPath path) throws IOException {
-    return bucketAccessible(
-        path,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT);
-  }
-
-  /**
-   * Returns the project number of the project which owns this bucket.
-   * If the bucket exists, it must be accessible otherwise the permissions
-   * exception will be propagated.  If the bucket does not exist, an exception
-   * will be thrown.
-   */
-  public long bucketOwner(GcsPath path) throws IOException {
-    return getBucket(
-        path,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT).getProjectNumber().longValue();
-  }
-
-  /**
-   * Creates a {@link Bucket} under the specified project in Cloud Storage or
-   * propagates an exception.
-   */
-  public void createBucket(String projectId, Bucket bucket) throws IOException {
-    createBucket(
-        projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
-  }
-
-  /**
-   * Returns whether the GCS bucket exists. This will return false if the bucket
-   * is inaccessible due to permissions.
-   */
-  @VisibleForTesting
-  boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
-    try {
-      return getBucket(path, backoff, sleeper) != null;
-    } catch (AccessDeniedException | FileNotFoundException e) {
-      return false;
-    }
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
-    Storage.Buckets.Get getBucket =
-        storageClient.buckets().get(path.getBucket());
-
-      try {
-        Bucket bucket = ResilientOperation.retry(
-            ResilientOperation.getGoogleRequestCallable(getBucket),
-            backoff,
-            new RetryDeterminer<IOException>() {
-              @Override
-              public boolean shouldRetry(IOException e) {
-                if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
-                  return false;
-                }
-                return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
-              }
-            },
-            IOException.class,
-            sleeper);
-
-        return bucket;
-      } catch (GoogleJsonResponseException e) {
-        if (errorExtractor.accessDenied(e)) {
-          throw new AccessDeniedException(path.toString(), null, e.getMessage());
-        }
-        if (errorExtractor.itemNotFound(e)) {
-          throw new FileNotFoundException(e.getMessage());
-        }
-        throw e;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(
-            String.format("Error while attempting to verify existence of bucket gs://%s",
-                path.getBucket()), e);
-     }
-  }
-
-  @VisibleForTesting
-  void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
-        throws IOException {
-    Storage.Buckets.Insert insertBucket =
-        storageClient.buckets().insert(projectId, bucket);
-    insertBucket.setPredefinedAcl("projectPrivate");
-    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
-
-    try {
-      ResilientOperation.retry(
-        ResilientOperation.getGoogleRequestCallable(insertBucket),
-        backoff,
-        new RetryDeterminer<IOException>() {
-          @Override
-          public boolean shouldRetry(IOException e) {
-            if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {
-              return false;
-            }
-            return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
-          }
-        },
-        IOException.class,
-        sleeper);
-      return;
-    } catch (GoogleJsonResponseException e) {
-      if (errorExtractor.accessDenied(e)) {
-        throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
-      }
-      if (errorExtractor.itemAlreadyExists(e)) {
-        throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());
-      }
-      throw e;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-        String.format("Error while attempting to create bucket gs://%s for rproject %s",
-                      bucket.getName(), projectId), e);
-    }
-  }
-
-  private static void executeBatches(List<BatchRequest> batches) throws IOException {
-    ListeningExecutorService executor = MoreExecutors.listeningDecorator(
-        MoreExecutors.getExitingExecutorService(
-            new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES,
-                0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>())));
-
-    List<ListenableFuture<Void>> futures = new LinkedList<>();
-    for (final BatchRequest batch : batches) {
-      futures.add(executor.submit(new Callable<Void>() {
-        public Void call() throws IOException {
-          batch.execute();
-          return null;
-        }
-      }));
-    }
-
-    try {
-      Futures.allAsList(futures).get();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while executing batch GCS request", e);
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof FileNotFoundException) {
-        throw (FileNotFoundException) e.getCause();
-      }
-      throw new IOException("Error executing batch GCS request", e);
-    } finally {
-      executor.shutdown();
-    }
-  }
-
-  /**
-   * Makes get {@link BatchRequest BatchRequests}.
-   *
-   * @param paths {@link GcsPath GcsPaths}.
-   * @param results mutable {@link List} for return values.
-   * @return {@link BatchRequest BatchRequests} to execute.
-   * @throws IOException
-   */
-  @VisibleForTesting
-  List<BatchRequest> makeGetBatches(
-      Collection<GcsPath> paths,
-      List<StorageObjectOrIOException[]> results) throws IOException {
-    List<BatchRequest> batches = new LinkedList<>();
-    for (List<GcsPath> filesToGet :
-        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = createBatchRequest();
-      for (GcsPath path : filesToGet) {
-        results.add(enqueueGetFileSize(path, batch));
-      }
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
-      throws IOException {
-    executeBatches(makeCopyBatches(srcFilenames, destFilenames));
-  }
-
-  List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
-      throws IOException {
-    List<String> srcList = Lists.newArrayList(srcFilenames);
-    List<String> destList = Lists.newArrayList(destFilenames);
-    checkArgument(
-        srcList.size() == destList.size(),
-        "Number of source files %s must equal number of destination files %s",
-        srcList.size(),
-        destList.size());
-
-    List<BatchRequest> batches = new LinkedList<>();
-    BatchRequest batch = createBatchRequest();
-    for (int i = 0; i < srcList.size(); i++) {
-      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
-      final GcsPath destPath = GcsPath.fromUri(destList.get(i));
-      enqueueCopy(sourcePath, destPath, batch);
-      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
-        batches.add(batch);
-        batch = createBatchRequest();
-      }
-    }
-    if (batch.size() > 0) {
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException {
-    List<BatchRequest> batches = new LinkedList<>();
-    for (List<String> filesToDelete :
-        Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) {
-      BatchRequest batch = createBatchRequest();
-      for (String file : filesToDelete) {
-        enqueueDelete(GcsPath.fromUri(file), batch);
-      }
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  public void remove(Collection<String> filenames) throws IOException {
-    executeBatches(makeRemoveBatches(filenames));
-  }
-
-  private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch)
-      throws IOException {
-    final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1];
-
-    Storage.Objects.Get getRequest = storageClient.objects()
-        .get(path.getBucket(), path.getObject());
-    getRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
-      @Override
-      public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException {
-        ret[0] = StorageObjectOrIOException.create(response);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException {
-        IOException ioException;
-        if (errorExtractor.itemNotFound(e)) {
-          ioException = new FileNotFoundException(path.toString());
-        } else {
-          ioException = new IOException(String.format("Error trying to get %s: %s", path, e));
-        }
-        ret[0] = StorageObjectOrIOException.create(ioException);
-      }
-    });
-    return ret;
-  }
-
-  /**
-   * A class that holds either a {@link StorageObject} or an {@link IOException}.
-   */
-  @AutoValue
-  public abstract static class StorageObjectOrIOException {
-
-    /**
-     * Returns the {@link StorageObject}.
-     */
-    @Nullable
-    public abstract StorageObject storageObject();
-
-    /**
-     * Returns the {@link IOException}.
-     */
-    @Nullable
-    public abstract IOException ioException();
-
-    @VisibleForTesting
-    public static StorageObjectOrIOException create(StorageObject storageObject) {
-      return new AutoValue_GcsUtil_StorageObjectOrIOException(
-          checkNotNull(storageObject, "storageObject"),
-          null /* ioException */);
-    }
-
-    @VisibleForTesting
-    public static StorageObjectOrIOException create(IOException ioException) {
-      return new AutoValue_GcsUtil_StorageObjectOrIOException(
-          null /* storageObject */,
-          checkNotNull(ioException, "ioException"));
-    }
-  }
-
-  private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch)
-      throws IOException {
-    Storage.Objects.Copy copyRequest = storageClient.objects()
-        .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null);
-    copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
-      @Override
-      public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
-        LOG.debug("Successfully copied {} to {}", from, to);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-        throw new IOException(
-            String.format("Error trying to copy %s to %s: %s", from, to, e));
-      }
-    });
-  }
-
-  private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException {
-    Storage.Objects.Delete deleteRequest = storageClient.objects()
-        .delete(file.getBucket(), file.getObject());
-    deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
-      @Override
-      public void onSuccess(Void obj, HttpHeaders responseHeaders) {
-        LOG.debug("Successfully deleted {}", file);
-      }
-
-      @Override
-      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-        throw new IOException(String.format("Error trying to delete %s: %s", file, e));
-      }
-    });
-  }
-
-  private BatchRequest createBatchRequest() {
-    return storageClient.batch(httpRequestInitializer);
-  }
-
-  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
-    // Emit the next character without special interpretation
-    dst.append('\\');
-    if ((i - 1) != src.length) {
-      dst.append(src[i]);
-      i++;
-    } else {
-      // A backslash at the very end is treated like an escaped backslash
-      dst.append('\\');
-    }
-    return i;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
deleted file mode 100644
index 2b7135e..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpResponseInterceptor;
-import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a request initializer that adds retry handlers to all
- * HttpRequests.
- *
- * <p>Also can take a HttpResponseInterceptor to be applied to the responses.
- */
-public class RetryHttpRequestInitializer implements HttpRequestInitializer {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class);
-
-  /**
-   * Http response codes that should be silently ignored.
-   */
-  private static final Set<Integer> DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>(
-      Arrays.asList(307 /* Redirect, handled by the client library */,
-                    308 /* Resume Incomplete, handled by the client library */));
-
-  /**
-   * Http response timeout to use for hanging gets.
-   */
-  private static final int HANGING_GET_TIMEOUT_SEC = 80;
-
-  private static class LoggingHttpBackOffIOExceptionHandler
-      extends HttpBackOffIOExceptionHandler {
-    public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
-      super(backOff);
-    }
-
-    @Override
-    public boolean handleIOException(HttpRequest request, boolean supportsRetry)
-        throws IOException {
-      boolean willRetry = super.handleIOException(request, supportsRetry);
-      if (willRetry) {
-        LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
-      } else {
-        LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl());
-      }
-      return willRetry;
-    }
-  }
-
-  private static class LoggingHttpBackoffUnsuccessfulResponseHandler
-      implements HttpUnsuccessfulResponseHandler {
-    private final HttpBackOffUnsuccessfulResponseHandler handler;
-    private final Set<Integer> ignoredResponseCodes;
-
-    public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
-        Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
-      this.ignoredResponseCodes = ignoredResponseCodes;
-      handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
-      handler.setSleeper(sleeper);
-      handler.setBackOffRequired(
-          new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
-            @Override
-            public boolean isRequired(HttpResponse response) {
-              int statusCode = response.getStatusCode();
-              return (statusCode / 100 == 5) ||  // 5xx: server error
-                  statusCode == 429;             // 429: Too many requests
-            }
-          });
-    }
-
-    @Override
-    public boolean handleResponse(HttpRequest request, HttpResponse response,
-        boolean supportsRetry) throws IOException {
-      boolean retry = handler.handleResponse(request, response, supportsRetry);
-      if (retry) {
-        LOG.debug("Request failed with code {} will retry: {}",
-            response.getStatusCode(), request.getUrl());
-
-      } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
-        LOG.warn("Request failed with code {}, will NOT retry: {}",
-            response.getStatusCode(), request.getUrl());
-      }
-
-      return retry;
-    }
-  }
-
-  private final HttpResponseInterceptor responseInterceptor;  // response Interceptor to use
-
-  private final NanoClock nanoClock;  // used for testing
-
-  private final Sleeper sleeper;  // used for testing
-
-  private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);
-
-  public RetryHttpRequestInitializer() {
-    this(Collections.<Integer>emptyList());
-  }
-
-  /**
-   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
-   */
-  public RetryHttpRequestInitializer(Collection<Integer> additionalIgnoredResponseCodes) {
-    this(additionalIgnoredResponseCodes, null);
-  }
-
-  /**
-   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
-   * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null.
-   */
-  public RetryHttpRequestInitializer(
-      Collection<Integer> additionalIgnoredResponseCodes,
-      @Nullable HttpResponseInterceptor responseInterceptor) {
-    this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes,
-        responseInterceptor);
-  }
-
-  /**
-   * Visible for testing.
-   *
-   * @param nanoClock used as a timing source for knowing how much time has elapsed.
-   * @param sleeper used to sleep between retries.
-   * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged.
-   */
-  RetryHttpRequestInitializer(
-      NanoClock nanoClock, Sleeper sleeper, Collection<Integer> additionalIgnoredResponseCodes,
-      HttpResponseInterceptor responseInterceptor) {
-    this.nanoClock = nanoClock;
-    this.sleeper = sleeper;
-    this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes);
-    this.responseInterceptor = responseInterceptor;
-  }
-
-  @Override
-  public void initialize(HttpRequest request) throws IOException {
-    // Set a timeout for hanging-gets.
-    // TODO: Do this exclusively for work requests.
-    request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
-
-    // Back off on retryable http errors.
-    request.setUnsuccessfulResponseHandler(
-        // A back-off multiplier of 2 raises the maximum request retrying time
-        // to approximately 5 minutes (keeping other back-off parameters to
-        // their default values).
-        new LoggingHttpBackoffUnsuccessfulResponseHandler(
-            new ExponentialBackOff.Builder().setNanoClock(nanoClock)
-                                            .setMultiplier(2).build(),
-            sleeper, ignoredResponseCodes));
-
-    // Retry immediately on IOExceptions.
-    LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
-        new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
-    request.setIOExceptionHandler(loggingBackoffHandler);
-
-    // Set response initializer
-    if (responseInterceptor != null) {
-      request.setResponseInterceptor(responseInterceptor);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
deleted file mode 100644
index b8474bb..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.storage.Storage;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-
-/**
- * Helpers for cloud communication.
- */
-public class Transport {
-
-  private static class SingletonHelper {
-    /** Global instance of the JSON factory. */
-    private static final JsonFactory JSON_FACTORY;
-
-    /** Global instance of the HTTP transport. */
-    private static final HttpTransport HTTP_TRANSPORT;
-
-    static {
-      try {
-        JSON_FACTORY = JacksonFactory.getDefaultInstance();
-        HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
-      } catch (GeneralSecurityException | IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  public static HttpTransport getTransport() {
-    return SingletonHelper.HTTP_TRANSPORT;
-  }
-
-  public static JsonFactory getJsonFactory() {
-    return SingletonHelper.JSON_FACTORY;
-  }
-
-  private static class ApiComponents {
-    public String rootUrl;
-    public String servicePath;
-
-    public ApiComponents(String root, String path) {
-      this.rootUrl = root;
-      this.servicePath = path;
-    }
-  }
-
-  private static ApiComponents apiComponentsFromUrl(String urlString) {
-    try {
-      URL url = new URL(urlString);
-      String rootUrl = url.getProtocol() + "://" + url.getHost()
-          + (url.getPort() > 0 ? ":" + url.getPort() : "");
-      return new ApiComponents(rootUrl, url.getPath());
-    } catch (MalformedURLException e) {
-      throw new RuntimeException("Invalid URL: " + urlString);
-    }
-  }
-
-  /**
-   * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
-   */
-  public static Storage.Builder
-      newStorageClient(GcsOptions options) {
-    String servicePath = options.getGcsEndpoint();
-    Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
-        chainHttpRequestInitializer(
-            options.getGcpCredential(),
-            // Do not log the code 404. Code up the stack will deal with 404's if needed, and
-            // logging it by default clutters the output during file staging.
-            new RetryHttpRequestInitializer(
-                ImmutableList.of(404), new UploadIdResponseInterceptor())))
-        .setApplicationName(options.getAppName())
-        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
-    if (servicePath != null) {
-      ApiComponents components = apiComponentsFromUrl(servicePath);
-      storageBuilder.setRootUrl(components.rootUrl);
-      storageBuilder.setServicePath(components.servicePath);
-    }
-    return storageBuilder;
-  }
-
-  private static HttpRequestInitializer chainHttpRequestInitializer(
-      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
-    if (credential == null) {
-      return new ChainingHttpRequestInitializer(
-          new NullCredentialInitializer(), httpRequestInitializer);
-    } else {
-      return new ChainingHttpRequestInitializer(
-          new HttpCredentialsAdapter(credential),
-          httpRequestInitializer);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
deleted file mode 100644
index 6a71bdc..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.gcsfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.google.api.services.storage.model.StorageObject;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.FileSystem;
-import java.nio.file.LinkOption;
-import java.nio.file.Path;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
- *
- * <p>GcsPath uses a slash ('/') as a directory separator.  Below is
- * a summary of how slashes are treated:
- * <ul>
- *   <li> A GCS bucket may not contain a slash.  An object may contain zero or
- *        more slashes.
- *   <li> A trailing slash always indicates a directory, which is compliant
- *        with POSIX.1-2008.
- *   <li> Slashes separate components of a path.  Empty components are allowed,
- *        these are represented as repeated slashes.  An empty component always
- *        refers to a directory, and always ends in a slash.
- *   <li> {@link #getParent()}} always returns a path ending in a slash, as the
- *        parent of a GcsPath is always a directory.
- *   <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
- *        applies the rules consistently and is highly recommended over any
- *        custom string concatenation.
- * </ul>
- *
- * <p>GcsPath treats all GCS objects and buckets as belonging to the same
- * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
- *
- * <p>Relative paths are not associated with any bucket.  This matches common
- * treatment of Path in which relative paths can be constructed from one
- * filesystem and appended to another filesystem.
- *
- * @see <a href=
- * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
- * >Java Tutorials: Path Operations</a>
- */
-public class GcsPath implements Path, Serializable {
-
-  public static final String SCHEME = "gs";
-
-  /**
-   * Creates a GcsPath from a URI.
-   *
-   * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
-   * contain a port, user info, a query, or a fragment.
-   */
-  public static GcsPath fromUri(URI uri) {
-    checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri);
-    checkArgument(uri.getPort() == -1,
-        "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
-    checkArgument(
-        isNullOrEmpty(uri.getUserInfo()),
-        "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
-    checkArgument(
-        isNullOrEmpty(uri.getQuery()),
-        "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
-    checkArgument(
-        isNullOrEmpty(uri.getFragment()),
-        "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
-
-    return fromUri(uri.toString());
-  }
-
-  /**
-   * Pattern that is used to parse a GCS URL.
-   *
-   * <p>This is used to separate the components.  Verification is handled
-   * separately.
-   */
-  public static final Pattern GCS_URI =
-      Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
-  /**
-   * Creates a GcsPath from a URI in string form.
-   *
-   * <p>This does not use URI parsing, which means it may accept patterns that
-   * the URI parser would not accept.
-   */
-  public static GcsPath fromUri(String uri) {
-    Matcher m = GCS_URI.matcher(uri);
-    checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
-
-    checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
-        "URI: %s is not a GCS URI", uri);
-    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
-  }
-
-  /**
-   * Pattern that is used to parse a GCS resource name.
-   */
-  private static final Pattern GCS_RESOURCE_NAME =
-      Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
-
-  /**
-   * Creates a GcsPath from a OnePlatform resource name in string form.
-   */
-  public static GcsPath fromResourceName(String name) {
-    Matcher m = GCS_RESOURCE_NAME.matcher(name);
-    checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
-
-    return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
-  }
-
-  /**
-   * Creates a GcsPath from a {@linkplain StorageObject}.
-   */
-  public static GcsPath fromObject(StorageObject object) {
-    return new GcsPath(null, object.getBucket(), object.getName());
-  }
-
-  /**
-   * Creates a GcsPath from bucket and object components.
-   *
-   * <p>A GcsPath without a bucket name is treated as a relative path, which
-   * is a path component with no linkage to the root element.  This is similar
-   * to a Unix path that does not begin with the root marker (a slash).
-   * GCS has different naming constraints and APIs for working with buckets and
-   * objects, so these two concepts are kept separate to avoid accidental
-   * attempts to treat objects as buckets, or vice versa, as much as possible.
-   *
-   * <p>A GcsPath without an object name is a bucket reference.
-   * A bucket is always a directory, which could be used to lookup or add
-   * files to a bucket, but could not be opened as a file.
-   *
-   * <p>A GcsPath containing neither bucket or object names is treated as
-   * the root of the GCS filesystem.  A listing on the root element would return
-   * the buckets available to the user.
-   *
-   * <p>If {@code null} is passed as either parameter, it is converted to an
-   * empty string internally for consistency.  There is no distinction between
-   * an empty string and a {@code null}, as neither are allowed by GCS.
-   *
-   * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
-   *               if the object is not associated with a bucket
-   *               (e.g. relative paths or the root node).
-   * @param object a GCS object path, or none ({@code null} or an empty string)
-   *               for no object.
-   */
-  public static GcsPath fromComponents(@Nullable String bucket,
-                                       @Nullable String object) {
-    return new GcsPath(null, bucket, object);
-  }
-
-  @Nullable
-  private transient FileSystem fs;
-  @Nonnull
-  private final String bucket;
-  @Nonnull
-  private final String object;
-
-  /**
-   * Constructs a GcsPath.
-   *
-   * @param fs the associated FileSystem, if any
-   * @param bucket the associated bucket, or none ({@code null} or an empty
-   *               string) for a relative path component
-   * @param object the object, which is a fully-qualified object name if bucket
-   *               was also provided, or none ({@code null} or an empty string)
-   *               for no object
-   * @throws java.lang.IllegalArgumentException if the bucket of object names
-   *         are invalid.
-   */
-  public GcsPath(@Nullable FileSystem fs,
-                 @Nullable String bucket,
-                 @Nullable String object) {
-    if (bucket == null) {
-      bucket = "";
-    }
-    checkArgument(!bucket.contains("/"),
-        "GCS bucket may not contain a slash");
-    checkArgument(bucket.isEmpty()
-                || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
-            "GCS bucket names must contain only lowercase letters, numbers, "
-                + "dashes (-), underscores (_), and dots (.). Bucket names "
-                + "must start and end with a number or letter. "
-                + "See https://developers.google.com/storage/docs/bucketnaming "
-                + "for more details.  Bucket name: " + bucket);
-
-    if (object == null) {
-      object = "";
-    }
-    checkArgument(
-        object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
-        "GCS object names must not contain Carriage Return or "
-            + "Line Feed characters.");
-
-    this.fs = fs;
-    this.bucket = bucket;
-    this.object = object;
-  }
-
-  /**
-   * Returns the bucket name associated with this GCS path, or an empty string
-   * if this is a relative path component.
-   */
-  public String getBucket() {
-    return bucket;
-  }
-
-  /**
-   * Returns the object name associated with this GCS path, or an empty string
-   * if no object is specified.
-   */
-  public String getObject() {
-    return object;
-  }
-
-  public void setFileSystem(FileSystem fs) {
-    this.fs = fs;
-  }
-
-  @Override
-  public FileSystem getFileSystem() {
-    return fs;
-  }
-
-  // Absolute paths are those that have a bucket and the root path.
-  @Override
-  public boolean isAbsolute() {
-    return !bucket.isEmpty() || object.isEmpty();
-  }
-
-  @Override
-  public GcsPath getRoot() {
-    return new GcsPath(fs, "", "");
-  }
-
-  @Override
-  public GcsPath getFileName() {
-    int nameCount = getNameCount();
-    if (nameCount < 2) {
-      throw new UnsupportedOperationException(
-          "Can't get filename from root path in the bucket: " + this);
-    }
-    return getName(nameCount - 1);
-  }
-
-  /**
-   * Returns the <em>parent path</em>, or {@code null} if this path does not
-   * have a parent.
-   *
-   * <p>Returns a path that ends in '/', as the parent path always refers to
-   * a directory.
-   */
-  @Override
-  public GcsPath getParent() {
-    if (bucket.isEmpty() && object.isEmpty()) {
-      // The root path has no parent, by definition.
-      return null;
-    }
-
-    if (object.isEmpty()) {
-      // A GCS bucket. All buckets come from a common root.
-      return getRoot();
-    }
-
-    // Skip last character, in case it is a trailing slash.
-    int i = object.lastIndexOf('/', object.length() - 2);
-    if (i <= 0) {
-      if (bucket.isEmpty()) {
-        // Relative paths are not attached to the root node.
-        return null;
-      }
-      return new GcsPath(fs, bucket, "");
-    }
-
-    // Retain trailing slash.
-    return new GcsPath(fs, bucket, object.substring(0, i + 1));
-  }
-
-  @Override
-  public int getNameCount() {
-    int count = bucket.isEmpty() ? 0 : 1;
-    if (object.isEmpty()) {
-      return count;
-    }
-
-    // Add another for each separator found.
-    int index = -1;
-    while ((index = object.indexOf('/', index + 1)) != -1) {
-      count++;
-    }
-
-    return object.endsWith("/") ? count : count + 1;
-  }
-
-  @Override
-  public GcsPath getName(int count) {
-    checkArgument(count >= 0);
-
-    Iterator<Path> iterator = iterator();
-    for (int i = 0; i < count; ++i) {
-      checkArgument(iterator.hasNext());
-      iterator.next();
-    }
-
-    checkArgument(iterator.hasNext());
-    return (GcsPath) iterator.next();
-  }
-
-  @Override
-  public GcsPath subpath(int beginIndex, int endIndex) {
-    checkArgument(beginIndex >= 0);
-    checkArgument(endIndex > beginIndex);
-
-    Iterator<Path> iterator = iterator();
-    for (int i = 0; i < beginIndex; ++i) {
-      checkArgument(iterator.hasNext());
-      iterator.next();
-    }
-
-    GcsPath path = null;
-    while (beginIndex < endIndex) {
-      checkArgument(iterator.hasNext());
-      if (path == null) {
-        path = (GcsPath) iterator.next();
-      } else {
-        path = path.resolve(iterator.next());
-      }
-      ++beginIndex;
-    }
-
-    return path;
-  }
-
-  @Override
-  public boolean startsWith(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath gcsPath = (GcsPath) other;
-      return startsWith(gcsPath.bucketAndObject());
-    } else {
-      return startsWith(other.toString());
-    }
-  }
-
-  @Override
-  public boolean startsWith(String prefix) {
-    return bucketAndObject().startsWith(prefix);
-  }
-
-  @Override
-  public boolean endsWith(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath gcsPath = (GcsPath) other;
-      return endsWith(gcsPath.bucketAndObject());
-    } else {
-      return endsWith(other.toString());
-    }
-  }
-
-  @Override
-  public boolean endsWith(String suffix) {
-    return bucketAndObject().endsWith(suffix);
-  }
-
-  // TODO: support "." and ".." path components?
-  @Override
-  public GcsPath normalize() {
-    return this;
-  }
-
-  @Override
-  public GcsPath resolve(Path other) {
-    if (other instanceof GcsPath) {
-      GcsPath path = (GcsPath) other;
-      if (path.isAbsolute()) {
-        return path;
-      } else {
-        return resolve(path.getObject());
-      }
-    } else {
-      return resolve(other.toString());
-    }
-  }
-
-  @Override
-  public GcsPath resolve(String other) {
-    if (bucket.isEmpty() && object.isEmpty()) {
-      // Resolve on a root path is equivalent to looking up a bucket and object.
-      other = SCHEME + "://" + other;
-    }
-
-    if (other.startsWith(SCHEME + "://")) {
-      GcsPath path = GcsPath.fromUri(other);
-      path.setFileSystem(getFileSystem());
-      return path;
-    }
-
-    if (other.isEmpty()) {
-      // An empty component MUST refer to a directory.
-      other = "/";
-    }
-
-    if (object.isEmpty()) {
-      return new GcsPath(fs, bucket, other);
-    } else if (object.endsWith("/")) {
-      return new GcsPath(fs, bucket, object + other);
-    } else {
-      return new GcsPath(fs, bucket, object + "/" + other);
-    }
-  }
-
-  @Override
-  public Path resolveSibling(Path other) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Path resolveSibling(String other) {
-    if (getNameCount() < 2) {
-      throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this);
-    }
-    GcsPath parent = getParent();
-    return (parent == null) ? fromUri(other) : parent.resolve(other);
-  }
-
-  @Override
-  public Path relativize(Path other) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public GcsPath toAbsolutePath() {
-    return this;
-  }
-
-  @Override
-  public GcsPath toRealPath(LinkOption... options) throws IOException {
-    return this;
-  }
-
-  @Override
-  public File toFile() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
-      WatchEvent.Modifier... modifiers) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
-      throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Iterator<Path> iterator() {
-    return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
-  }
-
-  private static class NameIterator implements Iterator<Path> {
-    private final FileSystem fs;
-    private boolean fullPath;
-    private String name;
-
-    NameIterator(FileSystem fs, boolean fullPath, String name) {
-      this.fs = fs;
-      this.fullPath = fullPath;
-      this.name = name;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return !isNullOrEmpty(name);
-    }
-
-    @Override
-    public GcsPath next() {
-      int i = name.indexOf('/');
-      String component;
-      if (i >= 0) {
-        component = name.substring(0, i);
-        name = name.substring(i + 1);
-      } else {
-        component = name;
-        name = null;
-      }
-      if (fullPath) {
-        fullPath = false;
-        return new GcsPath(fs, component, "");
-      } else {
-        // Relative paths have no bucket.
-        return new GcsPath(fs, "", component);
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  @Override
-  public int compareTo(Path other) {
-    if (!(other instanceof GcsPath)) {
-      throw new ClassCastException();
-    }
-
-    GcsPath path = (GcsPath) other;
-    int b = bucket.compareTo(path.bucket);
-    if (b != 0) {
-      return b;
-    }
-
-    // Compare a component at a time, so that the separator char doesn't
-    // get compared against component contents.  Eg, "a/b" < "a-1/b".
-    Iterator<Path> left = iterator();
-    Iterator<Path> right = path.iterator();
-
-    while (left.hasNext() && right.hasNext()) {
-      String leftStr = left.next().toString();
-      String rightStr = right.next().toString();
-      int c = leftStr.compareTo(rightStr);
-      if (c != 0) {
-        return c;
-      }
-    }
-
-    if (!left.hasNext() && !right.hasNext()) {
-      return 0;
-    } else {
-      return left.hasNext() ? 1 : -1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    GcsPath paths = (GcsPath) o;
-    return bucket.equals(paths.bucket) && object.equals(paths.object);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = bucket.hashCode();
-    result = 31 * result + object.hashCode();
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    if (!isAbsolute()) {
-      return object;
-    }
-    StringBuilder sb = new StringBuilder();
-    sb.append(SCHEME)
-        .append("://");
-    if (!bucket.isEmpty()) {
-      sb.append(bucket)
-          .append('/');
-    }
-    sb.append(object);
-    return sb.toString();
-  }
-
-  // TODO: Consider using resource names for all GCS paths used by the SDK.
-  public String toResourceName() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("storage.googleapis.com/");
-    if (!bucket.isEmpty()) {
-      sb.append(bucket).append('/');
-    }
-    sb.append(object);
-    return sb.toString();
-  }
-
-  @Override
-  public URI toUri() {
-    try {
-      return new URI(SCHEME, "//" + bucketAndObject(), null);
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Unable to create URI for GCS path " + this);
-    }
-  }
-
-  private String bucketAndObject() {
-    if (bucket.isEmpty()) {
-      return object;
-    } else {
-      return bucket + "/" + object;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
deleted file mode 100644
index 4d49f8c..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Defines utilities used to interact with Google Cloud Storage. */
-package org.apache.beam.sdk.util.gcsfs;

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
deleted file mode 100644
index f8135e7..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */
-package org.apache.beam.sdk.util;

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
deleted file mode 100644
index a8772c3..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam;
-
-import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import org.apache.beam.sdk.util.ApiSurface;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** API surface verification for Google Cloud Platform core components. */
-@RunWith(JUnit4.class)
-public class GcpCoreApiSurfaceTest {
-
-  @Test
-  public void testApiSurface() throws Exception {
-
-    @SuppressWarnings("unchecked")
-    final Set<String> allowed =
-        ImmutableSet.of(
-            "org.apache.beam",
-            "com.google.api.client",
-            "com.google.api.services.storage",
-            "com.google.auth",
-            "com.fasterxml.jackson.annotation",
-            "com.fasterxml.jackson.core",
-            "com.fasterxml.jackson.databind",
-            "org.apache.avro",
-            "org.hamcrest",
-            // via DataflowMatchers
-            "org.codehaus.jackson",
-            // via Avro
-            "org.joda.time",
-            "org.junit");
-
-    assertThat(
-        ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
deleted file mode 100644
index 6f0846e..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.gcp.auth;
-
-import com.google.auth.Credentials;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Fake credential, for use in testing.
- */
-public class TestCredential extends Credentials {
-  @Override
-  public String getAuthenticationType() {
-    return "Test";
-  }
-
-  @Override
-  public Map<String, List<String>> getRequestMetadata() throws IOException {
-    return Collections.emptyMap();
-  }
-
-  @Override
-  public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
-    return Collections.emptyMap();
-  }
-
-  @Override
-  public boolean hasRequestMetadata() {
-    return false;
-  }
-
-  @Override
-  public boolean hasRequestMetadataOnly() {
-    return true;
-  }
-
-  @Override
-  public void refresh() throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
deleted file mode 100644
index 68b3818..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.gcp.options;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import com.google.api.services.storage.model.Bucket;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestRule;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link GcpOptions}. */
-@RunWith(Enclosed.class)
-public class GcpOptionsTest {
-
-  /** Tests for the majority of methods. */
-  @RunWith(JUnit4.class)
-  public static class Common {
-    @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
-    @Test
-    public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
-      Map<String, String> environment =
-          ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
-      assertEquals("test-project",
-          runGetProjectTest(tmpFolder.newFile("properties"), environment));
-    }
-
-    @Test
-    public void testGetProjectFromAppDataEnv() throws Exception {
-      Map<String, String> environment =
-          ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
-      System.setProperty("os.name", "windows");
-      assertEquals("test-project",
-          runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
-              environment));
-    }
-
-    @Test
-    public void testGetProjectFromUserHomeEnvOld() throws Exception {
-      Map<String, String> environment = ImmutableMap.of();
-      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-      assertEquals("test-project",
-          runGetProjectTest(
-              new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
-              environment));
-    }
-
-    @Test
-    public void testGetProjectFromUserHomeEnv() throws Exception {
-      Map<String, String> environment = ImmutableMap.of();
-      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-      assertEquals("test-project", runGetProjectTest(
-          new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
-          environment));
-    }
-
-    @Test
-    public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
-      Map<String, String> environment = ImmutableMap.of();
-      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-      makePropertiesFileWithProject(
-          new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project");
-      assertEquals("test-project", runGetProjectTest(
-          new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
-          environment));
-    }
-
-    @Test
-    public void testUnableToGetDefaultProject() throws Exception {
-      System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
-      DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
-      when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
-      assertNull(projectFactory.create(PipelineOptionsFactory.create()));
-    }
-
-    @Test
-    public void testEmptyGcpTempLocation() throws Exception {
-      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-      options.setGcpCredential(new TestCredential());
-      options.setProject("");
-      thrown.expect(IllegalArgumentException.class);
-      thrown.expectMessage("--project is a required option");
-      options.getGcpTempLocation();
-    }
-
-    @Test
-    public void testDefaultGcpTempLocation() throws Exception {
-      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-      String tempLocation = "gs://bucket";
-      options.setTempLocation(tempLocation);
-      options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
-      assertEquals(tempLocation, options.getGcpTempLocation());
-    }
-
-    @Test
-    public void testDefaultGcpTempLocationInvalid() throws Exception {
-      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-      options.setTempLocation("file://");
-      thrown.expect(IllegalArgumentException.class);
-      thrown.expectMessage(
-          "Error constructing default value for gcpTempLocation: tempLocation is not"
-              + " a valid GCS path");
-      options.getGcpTempLocation();
-    }
-
-    @Test
-    public void testDefaultGcpTempLocationDoesNotExist() {
-      GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-      String tempLocation = "gs://does/not/exist";
-      options.setTempLocation(tempLocation);
-      thrown.expect(IllegalArgumentException.class);
-      thrown.expectMessage(
-          "Error constructing default value for gcpTempLocation: tempLocation is not"
-              + " a valid GCS path");
-      thrown.expectCause(
-          hasMessage(containsString("Output path does not exist or is not writeable")));
-
-      options.getGcpTempLocation();
-    }
-
-    private static void makePropertiesFileWithProject(File path, String projectId)
-        throws IOException {
-      String properties = String.format("[core]%n"
-          + "account = test-account@google.com%n"
-          + "project = %s%n"
-          + "%n"
-          + "[dataflow]%n"
-          + "magic = true%n", projectId);
-      Files.write(properties, path, StandardCharsets.UTF_8);
-    }
-
-    private static String runGetProjectTest(File path, Map<String, String> environment)
-        throws Exception {
-      makePropertiesFileWithProject(path, "test-project");
-      DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
-      when(projectFactory.getEnvironment()).thenReturn(environment);
-      return projectFactory.create(PipelineOptionsFactory.create());
-    }
-  }
-
-  /** Tests related to determining the GCP temp location. */
-  @RunWith(JUnit4.class)
-  public static class GcpTempLocation {
-    @Rule public ExpectedException thrown = ExpectedException.none();
-    @Mock private GcsUtil mockGcsUtil;
-    @Mock private CloudResourceManager mockCrmClient;
-    @Mock private Projects mockProjects;
-    @Mock private Get mockGet;
-    private Project fakeProject;
-    private PipelineOptions options;
-
-    @Before
-    public void setUp() throws Exception {
-      MockitoAnnotations.initMocks(this);
-      options = PipelineOptionsFactory.create();
-      options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
-      options.as(GcpOptions.class).setProject("foo");
-      options.as(GcpOptions.class).setZone("us-north1-a");
-      when(mockCrmClient.projects()).thenReturn(mockProjects);
-      when(mockProjects.get(any(String.class))).thenReturn(mockGet);
-      fakeProject = new Project().setProjectNumber(1L);
-    }
-
-    @Test
-    public void testCreateBucket() throws Exception {
-      doReturn(fakeProject).when(mockGet).execute();
-      when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L);
-
-      String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
-      assertEquals("gs://dataflow-staging-us-north1-1", bucket);
-    }
-
-    @Test
-    public void testCreateBucketProjectLookupFails() throws Exception {
-      doThrow(new IOException("badness")).when(mockGet).execute();
-
-      thrown.expect(RuntimeException.class);
-      thrown.expectMessage("Unable to verify project");
-      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
-    }
-
-    @Test
-    public void testCreateBucketCreateBucketFails() throws Exception {
-      doReturn(fakeProject).when(mockGet).execute();
-      doThrow(new IOException("badness")).when(
-          mockGcsUtil).createBucket(any(String.class), any(Bucket.class));
-
-      thrown.expect(RuntimeException.class);
-      thrown.expectMessage("Unable create default bucket");
-      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
-    }
-
-    @Test
-    public void testCannotGetBucketOwner() throws Exception {
-      doReturn(fakeProject).when(mockGet).execute();
-      when(mockGcsUtil.bucketOwner(any(GcsPath.class)))
-          .thenThrow(new IOException("badness"));
-
-      thrown.expect(RuntimeException.class);
-      thrown.expectMessage("Unable to determine the owner");
-      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
-    }
-
-    @Test
-    public void testProjectMismatch() throws Exception {
-      doReturn(fakeProject).when(mockGet).execute();
-      when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
-
-      thrown.expect(IllegalArgumentException.class);
-      thrown.expectMessage("Bucket owner does not match the project");
-      GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
-    }
-
-    @Test
-    public void regionFromZone() throws Exception {
-      assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a"));
-      assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a"));
-    }
-  }
-}


Mime
View raw message