Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E5326200CA3 for ; Tue, 2 May 2017 19:57:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E3B84160BAB; Tue, 2 May 2017 17:57:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9BF1F160BBF for ; Tue, 2 May 2017 19:57:08 +0200 (CEST) Received: (qmail 99519 invoked by uid 500); 2 May 2017 17:57:07 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 99124 invoked by uid 99); 2 May 2017 17:57:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 17:57:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B864E2F42; Tue, 2 May 2017 17:57:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.apache.org Date: Tue, 02 May 2017 17:57:10 -0000 Message-Id: <8d0ddc77d03440c099c33f8db5f4d21b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/7] beam git commit: [BEAM-2135] Move gcp-core to google-cloud-platform-core archived-at: Tue, 02 May 2017 17:57:11 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java deleted file mode 100644 index c8e6839..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.batch.json.JsonBatchCallback; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.auto.value.AutoValue; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; -import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; -import com.google.cloud.hadoop.util.ClientRequestHelper; -import com.google.cloud.hadoop.util.ResilientOperation; -import com.google.cloud.hadoop.util.RetryDeterminer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.AccessDeniedException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides operations on GCS. - */ -public class GcsUtil { - /** - * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using - * any transport flags specified on the {@link PipelineOptions}. - */ - public static class GcsUtilFactory implements DefaultValueFactory { - /** - * Returns an instance of {@link GcsUtil} based on the - * {@link PipelineOptions}. - * - *

If no instance has previously been created, one is created and the value - * stored in {@code options}. - */ - @Override - public GcsUtil create(PipelineOptions options) { - LOG.debug("Creating new GcsUtil"); - GcsOptions gcsOptions = options.as(GcsOptions.class); - Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); - return new GcsUtil( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - gcsOptions.getExecutorService(), - gcsOptions.getGcsUploadBufferSizeBytes()); - } - - /** - * Returns an instance of {@link GcsUtil} based on the given parameters. - */ - public static GcsUtil create( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - @Nullable Integer uploadBufferSizeBytes) { - return new GcsUtil( - storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); - - /** Maximum number of items to retrieve per Objects.List request. */ - private static final long MAX_LIST_ITEMS_PER_CALL = 1024; - - /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ - private static final Pattern GLOB_PREFIX = Pattern.compile("(?[^\\[*?]*)[\\[*?].*"); - - private static final String RECURSIVE_WILDCARD = "[*]{2}"; - - /** - * A {@link Pattern} for globs with a recursive wildcard. - */ - private static final Pattern RECURSIVE_GCS_PATTERN = - Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); - - /** - * Maximum number of requests permitted in a GCS batch request. - */ - private static final int MAX_REQUESTS_PER_BATCH = 100; - /** - * Maximum number of concurrent batches of requests executing on GCS. - */ - private static final int MAX_CONCURRENT_BATCHES = 256; - - private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); - - ///////////////////////////////////////////////////////////////////////////// - - /** Client for the GCS API. */ - private Storage storageClient; - private final HttpRequestInitializer httpRequestInitializer; - /** Buffer size for GCS uploads (in bytes). */ - @Nullable private final Integer uploadBufferSizeBytes; - - // Helper delegate for turning IOExceptions from API calls into higher-level semantics. - private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - - // Exposed for testing. - final ExecutorService executorService; - - /** - * Returns true if the given GCS pattern is supported otherwise fails with an - * exception. - */ - public static boolean isGcsPatternSupported(String gcsPattern) { - if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { - throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " - + " recursive wildcards are not supported."); - } - return true; - } - - /** - * Returns the prefix portion of the glob that doesn't contain wildcards. - */ - public static String getGlobPrefix(String globExp) { - checkArgument(isGcsPatternSupported(globExp)); - Matcher m = GLOB_PREFIX.matcher(globExp); - checkArgument( - m.matches(), - String.format("Glob expression: [%s] is not expandable.", globExp)); - return m.group("PREFIX"); - } - - /** - * Expands glob expressions to regular expressions. - * - * @param globExp the glob expression to expand - * @return a string with the regular expression this glob expands to - */ - public static String globToRegexp(String globExp) { - StringBuilder dst = new StringBuilder(); - char[] src = globExp.toCharArray(); - int i = 0; - while (i < src.length) { - char c = src[i++]; - switch (c) { - case '*': - dst.append("[^/]*"); - break; - case '?': - dst.append("[^/]"); - break; - case '.': - case '+': - case '{': - case '}': - case '(': - case ')': - case '|': - case '^': - case '$': - // These need to be escaped in regular expressions - dst.append('\\').append(c); - break; - case '\\': - i = doubleSlashes(dst, src, i); - break; - default: - dst.append(c); - break; - } - } - return dst.toString(); - } - - /** - * Returns true if the given {@code spec} contains glob. - */ - public static boolean isGlob(GcsPath spec) { - return GLOB_PREFIX.matcher(spec.getObject()).matches(); - } - - private GcsUtil( - Storage storageClient, - HttpRequestInitializer httpRequestInitializer, - ExecutorService executorService, - @Nullable Integer uploadBufferSizeBytes) { - this.storageClient = storageClient; - this.httpRequestInitializer = httpRequestInitializer; - this.uploadBufferSizeBytes = uploadBufferSizeBytes; - this.executorService = executorService; - } - - // Use this only for testing purposes. - protected void setStorageClient(Storage storageClient) { - this.storageClient = storageClient; - } - - /** - * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded - * in the result. For patterns that only match a single object, we ensure that the object - * exists. - */ - public List expand(GcsPath gcsPattern) throws IOException { - checkArgument(isGcsPatternSupported(gcsPattern.getObject())); - Pattern p = null; - String prefix = null; - if (!isGlob(gcsPattern)) { - // Not a glob. - try { - // Use a get request to fetch the metadata of the object, and ignore the return value. - // The request has strong global consistency. - getObject(gcsPattern); - return ImmutableList.of(gcsPattern); - } catch (FileNotFoundException e) { - // If the path was not found, return an empty list. - return ImmutableList.of(); - } - } else { - // Part before the first wildcard character. - prefix = getGlobPrefix(gcsPattern.getObject()); - p = Pattern.compile(globToRegexp(gcsPattern.getObject())); - } - - LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), - prefix, p.toString()); - - String pageToken = null; - List results = new LinkedList<>(); - do { - Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); - if (objects.getItems() == null) { - break; - } - - // Filter objects based on the regex. - for (StorageObject o : objects.getItems()) { - String name = o.getName(); - // Skip directories, which end with a slash. - if (p.matcher(name).matches() && !name.endsWith("/")) { - LOG.debug("Matched object: {}", name); - results.add(GcsPath.fromObject(o)); - } - } - pageToken = objects.getNextPageToken(); - } while (pageToken != null); - - return results; - } - - @VisibleForTesting - @Nullable - Integer getUploadBufferSizeBytes() { - return uploadBufferSizeBytes; - } - - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} - * if the resource does not exist. - */ - public long fileSize(GcsPath path) throws IOException { - return getObject(path).getSize().longValue(); - } - - /** - * Returns the {@link StorageObject} for the given {@link GcsPath}. - */ - public StorageObject getObject(GcsPath gcsPath) throws IOException { - return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); - } - - @VisibleForTesting - StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Objects.Get getObject = - storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); - try { - return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getObject), - backoff, - RetryDeterminer.SOCKET_ERRORS, - IOException.class, - sleeper); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { - throw new FileNotFoundException(gcsPath.toString()); - } - throw new IOException( - String.format("Unable to get the file object for path %s.", gcsPath), - e); - } - } - - /** - * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given - * {@link GcsPath GcsPaths}. - */ - public List getObjects(List gcsPaths) - throws IOException { - List results = new ArrayList<>(); - executeBatches(makeGetBatches(gcsPaths, results)); - ImmutableList.Builder ret = ImmutableList.builder(); - for (StorageObjectOrIOException[] result : results) { - ret.add(result[0]); - } - return ret.build(); - } - - /** - * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. - */ - public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) - throws IOException { - // List all objects that start with the prefix (including objects in sub-directories). - Storage.Objects.List listObject = storageClient.objects().list(bucket); - listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); - listObject.setPrefix(prefix); - - if (pageToken != null) { - listObject.setPageToken(pageToken); - } - - try { - return ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(listObject), - BACKOFF_FACTORY.backoff(), - RetryDeterminer.SOCKET_ERRORS, - IOException.class); - } catch (Exception e) { - throw new IOException( - String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), - e); - } - } - - /** - * Returns the file size from GCS or throws {@link FileNotFoundException} - * if the resource does not exist. - */ - @VisibleForTesting - List fileSizes(List paths) throws IOException { - List results = getObjects(paths); - - ImmutableList.Builder ret = ImmutableList.builder(); - for (StorageObjectOrIOException result : results) { - ret.add(toFileSize(result)); - } - return ret.build(); - } - - private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) - throws IOException { - if (storageObjectOrIOException.ioException() != null) { - throw storageObjectOrIOException.ioException(); - } else { - return storageObjectOrIOException.storageObject().getSize().longValue(); - } - } - - /** - * Opens an object in GCS. - * - *

Returns a SeekableByteChannel that provides access to data in the bucket. - * - * @param path the GCS filename to read from - * @return a SeekableByteChannel that can read the object data - */ - public SeekableByteChannel open(GcsPath path) - throws IOException { - return new GoogleCloudStorageReadChannel(storageClient, path.getBucket(), - path.getObject(), errorExtractor, - new ClientRequestHelper()); - } - - /** - * Creates an object in GCS. - * - *

Returns a WritableByteChannel that can be used to write data to the - * object. - * - * @param path the GCS file to write to - * @param type the type of object, eg "text/plain". - * @return a Callable object that encloses the operation. - */ - public WritableByteChannel create(GcsPath path, - String type) throws IOException { - GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel( - executorService, - storageClient, - new ClientRequestHelper(), - path.getBucket(), - path.getObject(), - AsyncWriteChannelOptions.newBuilder().build(), - new ObjectWriteConditions(), - Collections.emptyMap(), - type); - if (uploadBufferSizeBytes != null) { - channel.setUploadBufferSize(uploadBufferSizeBytes); - } - channel.initialize(); - return channel; - } - - /** - * Returns whether the GCS bucket exists and is accessible. - */ - public boolean bucketAccessible(GcsPath path) throws IOException { - return bucketAccessible( - path, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); - } - - /** - * Returns the project number of the project which owns this bucket. - * If the bucket exists, it must be accessible otherwise the permissions - * exception will be propagated. If the bucket does not exist, an exception - * will be thrown. - */ - public long bucketOwner(GcsPath path) throws IOException { - return getBucket( - path, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT).getProjectNumber().longValue(); - } - - /** - * Creates a {@link Bucket} under the specified project in Cloud Storage or - * propagates an exception. - */ - public void createBucket(String projectId, Bucket bucket) throws IOException { - createBucket( - projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); - } - - /** - * Returns whether the GCS bucket exists. This will return false if the bucket - * is inaccessible due to permissions. - */ - @VisibleForTesting - boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - try { - return getBucket(path, backoff, sleeper) != null; - } catch (AccessDeniedException | FileNotFoundException e) { - return false; - } - } - - @VisibleForTesting - @Nullable - Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - Storage.Buckets.Get getBucket = - storageClient.buckets().get(path.getBucket()); - - try { - Bucket bucket = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getBucket), - backoff, - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); - } - }, - IOException.class, - sleeper); - - return bucket; - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(path.toString(), null, e.getMessage()); - } - if (errorExtractor.itemNotFound(e)) { - throw new FileNotFoundException(e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format("Error while attempting to verify existence of bucket gs://%s", - path.getBucket()), e); - } - } - - @VisibleForTesting - void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) - throws IOException { - Storage.Buckets.Insert insertBucket = - storageClient.buckets().insert(projectId, bucket); - insertBucket.setPredefinedAcl("projectPrivate"); - insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); - - try { - ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(insertBucket), - backoff, - new RetryDeterminer() { - @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { - return false; - } - return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); - } - }, - IOException.class, - sleeper); - return; - } catch (GoogleJsonResponseException e) { - if (errorExtractor.accessDenied(e)) { - throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); - } - if (errorExtractor.itemAlreadyExists(e)) { - throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); - } - throw e; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - String.format("Error while attempting to create bucket gs://%s for rproject %s", - bucket.getName(), projectId), e); - } - } - - private static void executeBatches(List batches) throws IOException { - ListeningExecutorService executor = MoreExecutors.listeningDecorator( - MoreExecutors.getExitingExecutorService( - new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue()))); - - List> futures = new LinkedList<>(); - for (final BatchRequest batch : batches) { - futures.add(executor.submit(new Callable() { - public Void call() throws IOException { - batch.execute(); - return null; - } - })); - } - - try { - Futures.allAsList(futures).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while executing batch GCS request", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof FileNotFoundException) { - throw (FileNotFoundException) e.getCause(); - } - throw new IOException("Error executing batch GCS request", e); - } finally { - executor.shutdown(); - } - } - - /** - * Makes get {@link BatchRequest BatchRequests}. - * - * @param paths {@link GcsPath GcsPaths}. - * @param results mutable {@link List} for return values. - * @return {@link BatchRequest BatchRequests} to execute. - * @throws IOException - */ - @VisibleForTesting - List makeGetBatches( - Collection paths, - List results) throws IOException { - List batches = new LinkedList<>(); - for (List filesToGet : - Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); - for (GcsPath path : filesToGet) { - results.add(enqueueGetFileSize(path, batch)); - } - batches.add(batch); - } - return batches; - } - - public void copy(Iterable srcFilenames, Iterable destFilenames) - throws IOException { - executeBatches(makeCopyBatches(srcFilenames, destFilenames)); - } - - List makeCopyBatches(Iterable srcFilenames, Iterable destFilenames) - throws IOException { - List srcList = Lists.newArrayList(srcFilenames); - List destList = Lists.newArrayList(destFilenames); - checkArgument( - srcList.size() == destList.size(), - "Number of source files %s must equal number of destination files %s", - srcList.size(), - destList.size()); - - List batches = new LinkedList<>(); - BatchRequest batch = createBatchRequest(); - for (int i = 0; i < srcList.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); - final GcsPath destPath = GcsPath.fromUri(destList.get(i)); - enqueueCopy(sourcePath, destPath, batch); - if (batch.size() >= MAX_REQUESTS_PER_BATCH) { - batches.add(batch); - batch = createBatchRequest(); - } - } - if (batch.size() > 0) { - batches.add(batch); - } - return batches; - } - - List makeRemoveBatches(Collection filenames) throws IOException { - List batches = new LinkedList<>(); - for (List filesToDelete : - Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); - for (String file : filesToDelete) { - enqueueDelete(GcsPath.fromUri(file), batch); - } - batches.add(batch); - } - return batches; - } - - public void remove(Collection filenames) throws IOException { - executeBatches(makeRemoveBatches(filenames)); - } - - private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) - throws IOException { - final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; - - Storage.Objects.Get getRequest = storageClient.objects() - .get(path.getBucket(), path.getObject()); - getRequest.queue(batch, new JsonBatchCallback() { - @Override - public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { - ret[0] = StorageObjectOrIOException.create(response); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { - IOException ioException; - if (errorExtractor.itemNotFound(e)) { - ioException = new FileNotFoundException(path.toString()); - } else { - ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); - } - ret[0] = StorageObjectOrIOException.create(ioException); - } - }); - return ret; - } - - /** - * A class that holds either a {@link StorageObject} or an {@link IOException}. - */ - @AutoValue - public abstract static class StorageObjectOrIOException { - - /** - * Returns the {@link StorageObject}. - */ - @Nullable - public abstract StorageObject storageObject(); - - /** - * Returns the {@link IOException}. - */ - @Nullable - public abstract IOException ioException(); - - @VisibleForTesting - public static StorageObjectOrIOException create(StorageObject storageObject) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - checkNotNull(storageObject, "storageObject"), - null /* ioException */); - } - - @VisibleForTesting - public static StorageObjectOrIOException create(IOException ioException) { - return new AutoValue_GcsUtil_StorageObjectOrIOException( - null /* storageObject */, - checkNotNull(ioException, "ioException")); - } - } - - private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) - throws IOException { - Storage.Objects.Copy copyRequest = storageClient.objects() - .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); - copyRequest.queue(batch, new JsonBatchCallback() { - @Override - public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully copied {} to {}", from, to); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - throw new IOException( - String.format("Error trying to copy %s to %s: %s", from, to, e)); - } - }); - } - - private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { - Storage.Objects.Delete deleteRequest = storageClient.objects() - .delete(file.getBucket(), file.getObject()); - deleteRequest.queue(batch, new JsonBatchCallback() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully deleted {}", file); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - throw new IOException(String.format("Error trying to delete %s: %s", file, e)); - } - }); - } - - private BatchRequest createBatchRequest() { - return storageClient.batch(httpRequestInitializer); - } - - private static int doubleSlashes(StringBuilder dst, char[] src, int i) { - // Emit the next character without special interpretation - dst.append('\\'); - if ((i - 1) != src.length) { - dst.append(src[i]); - i++; - } else { - // A backslash at the very end is treated like an escaped backslash - dst.append('\\'); - } - return i; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java deleted file mode 100644 index 2b7135e..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.http.HttpBackOffIOExceptionHandler; -import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpResponseInterceptor; -import com.google.api.client.http.HttpUnsuccessfulResponseHandler; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.ExponentialBackOff; -import com.google.api.client.util.NanoClock; -import com.google.api.client.util.Sleeper; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a request initializer that adds retry handlers to all - * HttpRequests. - * - *

Also can take a HttpResponseInterceptor to be applied to the responses. - */ -public class RetryHttpRequestInitializer implements HttpRequestInitializer { - - private static final Logger LOG = LoggerFactory.getLogger(RetryHttpRequestInitializer.class); - - /** - * Http response codes that should be silently ignored. - */ - private static final Set DEFAULT_IGNORED_RESPONSE_CODES = new HashSet<>( - Arrays.asList(307 /* Redirect, handled by the client library */, - 308 /* Resume Incomplete, handled by the client library */)); - - /** - * Http response timeout to use for hanging gets. - */ - private static final int HANGING_GET_TIMEOUT_SEC = 80; - - private static class LoggingHttpBackOffIOExceptionHandler - extends HttpBackOffIOExceptionHandler { - public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) { - super(backOff); - } - - @Override - public boolean handleIOException(HttpRequest request, boolean supportsRetry) - throws IOException { - boolean willRetry = super.handleIOException(request, supportsRetry); - if (willRetry) { - LOG.debug("Request failed with IOException, will retry: {}", request.getUrl()); - } else { - LOG.warn("Request failed with IOException, will NOT retry: {}", request.getUrl()); - } - return willRetry; - } - } - - private static class LoggingHttpBackoffUnsuccessfulResponseHandler - implements HttpUnsuccessfulResponseHandler { - private final HttpBackOffUnsuccessfulResponseHandler handler; - private final Set ignoredResponseCodes; - - public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff, - Sleeper sleeper, Set ignoredResponseCodes) { - this.ignoredResponseCodes = ignoredResponseCodes; - handler = new HttpBackOffUnsuccessfulResponseHandler(backoff); - handler.setSleeper(sleeper); - handler.setBackOffRequired( - new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() { - @Override - public boolean isRequired(HttpResponse response) { - int statusCode = response.getStatusCode(); - return (statusCode / 100 == 5) || // 5xx: server error - statusCode == 429; // 429: Too many requests - } - }); - } - - @Override - public boolean handleResponse(HttpRequest request, HttpResponse response, - boolean supportsRetry) throws IOException { - boolean retry = handler.handleResponse(request, response, supportsRetry); - if (retry) { - LOG.debug("Request failed with code {} will retry: {}", - response.getStatusCode(), request.getUrl()); - - } else if (!ignoredResponseCodes.contains(response.getStatusCode())) { - LOG.warn("Request failed with code {}, will NOT retry: {}", - response.getStatusCode(), request.getUrl()); - } - - return retry; - } - } - - private final HttpResponseInterceptor responseInterceptor; // response Interceptor to use - - private final NanoClock nanoClock; // used for testing - - private final Sleeper sleeper; // used for testing - - private Set ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES); - - public RetryHttpRequestInitializer() { - this(Collections.emptyList()); - } - - /** - * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. - */ - public RetryHttpRequestInitializer(Collection additionalIgnoredResponseCodes) { - this(additionalIgnoredResponseCodes, null); - } - - /** - * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. - * @param responseInterceptor HttpResponseInterceptor to be applied on all requests. May be null. - */ - public RetryHttpRequestInitializer( - Collection additionalIgnoredResponseCodes, - @Nullable HttpResponseInterceptor responseInterceptor) { - this(NanoClock.SYSTEM, Sleeper.DEFAULT, additionalIgnoredResponseCodes, - responseInterceptor); - } - - /** - * Visible for testing. - * - * @param nanoClock used as a timing source for knowing how much time has elapsed. - * @param sleeper used to sleep between retries. - * @param additionalIgnoredResponseCodes a list of HTTP status codes that should not be logged. - */ - RetryHttpRequestInitializer( - NanoClock nanoClock, Sleeper sleeper, Collection additionalIgnoredResponseCodes, - HttpResponseInterceptor responseInterceptor) { - this.nanoClock = nanoClock; - this.sleeper = sleeper; - this.ignoredResponseCodes.addAll(additionalIgnoredResponseCodes); - this.responseInterceptor = responseInterceptor; - } - - @Override - public void initialize(HttpRequest request) throws IOException { - // Set a timeout for hanging-gets. - // TODO: Do this exclusively for work requests. - request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000); - - // Back off on retryable http errors. - request.setUnsuccessfulResponseHandler( - // A back-off multiplier of 2 raises the maximum request retrying time - // to approximately 5 minutes (keeping other back-off parameters to - // their default values). - new LoggingHttpBackoffUnsuccessfulResponseHandler( - new ExponentialBackOff.Builder().setNanoClock(nanoClock) - .setMultiplier(2).build(), - sleeper, ignoredResponseCodes)); - - // Retry immediately on IOExceptions. - LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler = - new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF); - request.setIOExceptionHandler(loggingBackoffHandler); - - // Set response initializer - if (responseInterceptor != null) { - request.setResponseInterceptor(responseInterceptor); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java deleted file mode 100644 index b8474bb..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.storage.Storage; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.security.GeneralSecurityException; -import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; - -/** - * Helpers for cloud communication. - */ -public class Transport { - - private static class SingletonHelper { - /** Global instance of the JSON factory. */ - private static final JsonFactory JSON_FACTORY; - - /** Global instance of the HTTP transport. */ - private static final HttpTransport HTTP_TRANSPORT; - - static { - try { - JSON_FACTORY = JacksonFactory.getDefaultInstance(); - HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport(); - } catch (GeneralSecurityException | IOException e) { - throw new RuntimeException(e); - } - } - } - - public static HttpTransport getTransport() { - return SingletonHelper.HTTP_TRANSPORT; - } - - public static JsonFactory getJsonFactory() { - return SingletonHelper.JSON_FACTORY; - } - - private static class ApiComponents { - public String rootUrl; - public String servicePath; - - public ApiComponents(String root, String path) { - this.rootUrl = root; - this.servicePath = path; - } - } - - private static ApiComponents apiComponentsFromUrl(String urlString) { - try { - URL url = new URL(urlString); - String rootUrl = url.getProtocol() + "://" + url.getHost() - + (url.getPort() > 0 ? ":" + url.getPort() : ""); - return new ApiComponents(rootUrl, url.getPath()); - } catch (MalformedURLException e) { - throw new RuntimeException("Invalid URL: " + urlString); - } - } - - /** - * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. - */ - public static Storage.Builder - newStorageClient(GcsOptions options) { - String servicePath = options.getGcsEndpoint(); - Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log the code 404. Code up the stack will deal with 404's if needed, and - // logging it by default clutters the output during file staging. - new RetryHttpRequestInitializer( - ImmutableList.of(404), new UploadIdResponseInterceptor()))) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - if (servicePath != null) { - ApiComponents components = apiComponentsFromUrl(servicePath); - storageBuilder.setRootUrl(components.rootUrl); - storageBuilder.setServicePath(components.servicePath); - } - return storageBuilder; - } - - private static HttpRequestInitializer chainHttpRequestInitializer( - Credentials credential, HttpRequestInitializer httpRequestInitializer) { - if (credential == null) { - return new ChainingHttpRequestInitializer( - new NullCredentialInitializer(), httpRequestInitializer); - } else { - return new ChainingHttpRequestInitializer( - new HttpCredentialsAdapter(credential), - httpRequestInitializer); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java deleted file mode 100644 index 6a71bdc..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.gcsfs; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; - -import com.google.api.services.storage.model.StorageObject; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.FileSystem; -import java.nio.file.LinkOption; -import java.nio.file.Path; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.Iterator; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * Implements the Java NIO {@link Path} API for Google Cloud Storage paths. - * - *

GcsPath uses a slash ('/') as a directory separator. Below is - * a summary of how slashes are treated: - *

    - *
  • A GCS bucket may not contain a slash. An object may contain zero or - * more slashes. - *
  • A trailing slash always indicates a directory, which is compliant - * with POSIX.1-2008. - *
  • Slashes separate components of a path. Empty components are allowed, - * these are represented as repeated slashes. An empty component always - * refers to a directory, and always ends in a slash. - *
  • {@link #getParent()}} always returns a path ending in a slash, as the - * parent of a GcsPath is always a directory. - *
  • Use {@link #resolve(String)} to append elements to a GcsPath -- this - * applies the rules consistently and is highly recommended over any - * custom string concatenation. - *
- * - *

GcsPath treats all GCS objects and buckets as belonging to the same - * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="". - * - *

Relative paths are not associated with any bucket. This matches common - * treatment of Path in which relative paths can be constructed from one - * filesystem and appended to another filesystem. - * - * @see Java Tutorials: Path Operations - */ -public class GcsPath implements Path, Serializable { - - public static final String SCHEME = "gs"; - - /** - * Creates a GcsPath from a URI. - * - *

The URI must be in the form {@code gs://[bucket]/[path]}, and may not - * contain a port, user info, a query, or a fragment. - */ - public static GcsPath fromUri(URI uri) { - checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri); - checkArgument(uri.getPort() == -1, - "GCS URI may not specify port: %s (%i)", uri, uri.getPort()); - checkArgument( - isNullOrEmpty(uri.getUserInfo()), - "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo()); - checkArgument( - isNullOrEmpty(uri.getQuery()), - "GCS URI may not specify query: %s (%s)", uri, uri.getQuery()); - checkArgument( - isNullOrEmpty(uri.getFragment()), - "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment()); - - return fromUri(uri.toString()); - } - - /** - * Pattern that is used to parse a GCS URL. - * - *

This is used to separate the components. Verification is handled - * separately. - */ - public static final Pattern GCS_URI = - Pattern.compile("(?[^:]+)://(?[^/]+)(/(?.*))?"); - - /** - * Creates a GcsPath from a URI in string form. - * - *

This does not use URI parsing, which means it may accept patterns that - * the URI parser would not accept. - */ - public static GcsPath fromUri(String uri) { - Matcher m = GCS_URI.matcher(uri); - checkArgument(m.matches(), "Invalid GCS URI: %s", uri); - - checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME), - "URI: %s is not a GCS URI", uri); - return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); - } - - /** - * Pattern that is used to parse a GCS resource name. - */ - private static final Pattern GCS_RESOURCE_NAME = - Pattern.compile("storage.googleapis.com/(?[^/]+)(/(?.*))?"); - - /** - * Creates a GcsPath from a OnePlatform resource name in string form. - */ - public static GcsPath fromResourceName(String name) { - Matcher m = GCS_RESOURCE_NAME.matcher(name); - checkArgument(m.matches(), "Invalid GCS resource name: %s", name); - - return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); - } - - /** - * Creates a GcsPath from a {@linkplain StorageObject}. - */ - public static GcsPath fromObject(StorageObject object) { - return new GcsPath(null, object.getBucket(), object.getName()); - } - - /** - * Creates a GcsPath from bucket and object components. - * - *

A GcsPath without a bucket name is treated as a relative path, which - * is a path component with no linkage to the root element. This is similar - * to a Unix path that does not begin with the root marker (a slash). - * GCS has different naming constraints and APIs for working with buckets and - * objects, so these two concepts are kept separate to avoid accidental - * attempts to treat objects as buckets, or vice versa, as much as possible. - * - *

A GcsPath without an object name is a bucket reference. - * A bucket is always a directory, which could be used to lookup or add - * files to a bucket, but could not be opened as a file. - * - *

A GcsPath containing neither bucket or object names is treated as - * the root of the GCS filesystem. A listing on the root element would return - * the buckets available to the user. - * - *

If {@code null} is passed as either parameter, it is converted to an - * empty string internally for consistency. There is no distinction between - * an empty string and a {@code null}, as neither are allowed by GCS. - * - * @param bucket a GCS bucket name, or none ({@code null} or an empty string) - * if the object is not associated with a bucket - * (e.g. relative paths or the root node). - * @param object a GCS object path, or none ({@code null} or an empty string) - * for no object. - */ - public static GcsPath fromComponents(@Nullable String bucket, - @Nullable String object) { - return new GcsPath(null, bucket, object); - } - - @Nullable - private transient FileSystem fs; - @Nonnull - private final String bucket; - @Nonnull - private final String object; - - /** - * Constructs a GcsPath. - * - * @param fs the associated FileSystem, if any - * @param bucket the associated bucket, or none ({@code null} or an empty - * string) for a relative path component - * @param object the object, which is a fully-qualified object name if bucket - * was also provided, or none ({@code null} or an empty string) - * for no object - * @throws java.lang.IllegalArgumentException if the bucket of object names - * are invalid. - */ - public GcsPath(@Nullable FileSystem fs, - @Nullable String bucket, - @Nullable String object) { - if (bucket == null) { - bucket = ""; - } - checkArgument(!bucket.contains("/"), - "GCS bucket may not contain a slash"); - checkArgument(bucket.isEmpty() - || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"), - "GCS bucket names must contain only lowercase letters, numbers, " - + "dashes (-), underscores (_), and dots (.). Bucket names " - + "must start and end with a number or letter. " - + "See https://developers.google.com/storage/docs/bucketnaming " - + "for more details. Bucket name: " + bucket); - - if (object == null) { - object = ""; - } - checkArgument( - object.indexOf('\n') < 0 && object.indexOf('\r') < 0, - "GCS object names must not contain Carriage Return or " - + "Line Feed characters."); - - this.fs = fs; - this.bucket = bucket; - this.object = object; - } - - /** - * Returns the bucket name associated with this GCS path, or an empty string - * if this is a relative path component. - */ - public String getBucket() { - return bucket; - } - - /** - * Returns the object name associated with this GCS path, or an empty string - * if no object is specified. - */ - public String getObject() { - return object; - } - - public void setFileSystem(FileSystem fs) { - this.fs = fs; - } - - @Override - public FileSystem getFileSystem() { - return fs; - } - - // Absolute paths are those that have a bucket and the root path. - @Override - public boolean isAbsolute() { - return !bucket.isEmpty() || object.isEmpty(); - } - - @Override - public GcsPath getRoot() { - return new GcsPath(fs, "", ""); - } - - @Override - public GcsPath getFileName() { - int nameCount = getNameCount(); - if (nameCount < 2) { - throw new UnsupportedOperationException( - "Can't get filename from root path in the bucket: " + this); - } - return getName(nameCount - 1); - } - - /** - * Returns the parent path, or {@code null} if this path does not - * have a parent. - * - *

Returns a path that ends in '/', as the parent path always refers to - * a directory. - */ - @Override - public GcsPath getParent() { - if (bucket.isEmpty() && object.isEmpty()) { - // The root path has no parent, by definition. - return null; - } - - if (object.isEmpty()) { - // A GCS bucket. All buckets come from a common root. - return getRoot(); - } - - // Skip last character, in case it is a trailing slash. - int i = object.lastIndexOf('/', object.length() - 2); - if (i <= 0) { - if (bucket.isEmpty()) { - // Relative paths are not attached to the root node. - return null; - } - return new GcsPath(fs, bucket, ""); - } - - // Retain trailing slash. - return new GcsPath(fs, bucket, object.substring(0, i + 1)); - } - - @Override - public int getNameCount() { - int count = bucket.isEmpty() ? 0 : 1; - if (object.isEmpty()) { - return count; - } - - // Add another for each separator found. - int index = -1; - while ((index = object.indexOf('/', index + 1)) != -1) { - count++; - } - - return object.endsWith("/") ? count : count + 1; - } - - @Override - public GcsPath getName(int count) { - checkArgument(count >= 0); - - Iterator iterator = iterator(); - for (int i = 0; i < count; ++i) { - checkArgument(iterator.hasNext()); - iterator.next(); - } - - checkArgument(iterator.hasNext()); - return (GcsPath) iterator.next(); - } - - @Override - public GcsPath subpath(int beginIndex, int endIndex) { - checkArgument(beginIndex >= 0); - checkArgument(endIndex > beginIndex); - - Iterator iterator = iterator(); - for (int i = 0; i < beginIndex; ++i) { - checkArgument(iterator.hasNext()); - iterator.next(); - } - - GcsPath path = null; - while (beginIndex < endIndex) { - checkArgument(iterator.hasNext()); - if (path == null) { - path = (GcsPath) iterator.next(); - } else { - path = path.resolve(iterator.next()); - } - ++beginIndex; - } - - return path; - } - - @Override - public boolean startsWith(Path other) { - if (other instanceof GcsPath) { - GcsPath gcsPath = (GcsPath) other; - return startsWith(gcsPath.bucketAndObject()); - } else { - return startsWith(other.toString()); - } - } - - @Override - public boolean startsWith(String prefix) { - return bucketAndObject().startsWith(prefix); - } - - @Override - public boolean endsWith(Path other) { - if (other instanceof GcsPath) { - GcsPath gcsPath = (GcsPath) other; - return endsWith(gcsPath.bucketAndObject()); - } else { - return endsWith(other.toString()); - } - } - - @Override - public boolean endsWith(String suffix) { - return bucketAndObject().endsWith(suffix); - } - - // TODO: support "." and ".." path components? - @Override - public GcsPath normalize() { - return this; - } - - @Override - public GcsPath resolve(Path other) { - if (other instanceof GcsPath) { - GcsPath path = (GcsPath) other; - if (path.isAbsolute()) { - return path; - } else { - return resolve(path.getObject()); - } - } else { - return resolve(other.toString()); - } - } - - @Override - public GcsPath resolve(String other) { - if (bucket.isEmpty() && object.isEmpty()) { - // Resolve on a root path is equivalent to looking up a bucket and object. - other = SCHEME + "://" + other; - } - - if (other.startsWith(SCHEME + "://")) { - GcsPath path = GcsPath.fromUri(other); - path.setFileSystem(getFileSystem()); - return path; - } - - if (other.isEmpty()) { - // An empty component MUST refer to a directory. - other = "/"; - } - - if (object.isEmpty()) { - return new GcsPath(fs, bucket, other); - } else if (object.endsWith("/")) { - return new GcsPath(fs, bucket, object + other); - } else { - return new GcsPath(fs, bucket, object + "/" + other); - } - } - - @Override - public Path resolveSibling(Path other) { - throw new UnsupportedOperationException(); - } - - @Override - public Path resolveSibling(String other) { - if (getNameCount() < 2) { - throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this); - } - GcsPath parent = getParent(); - return (parent == null) ? fromUri(other) : parent.resolve(other); - } - - @Override - public Path relativize(Path other) { - throw new UnsupportedOperationException(); - } - - @Override - public GcsPath toAbsolutePath() { - return this; - } - - @Override - public GcsPath toRealPath(LinkOption... options) throws IOException { - return this; - } - - @Override - public File toFile() { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind[] events, - WatchEvent.Modifier... modifiers) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind... events) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject()); - } - - private static class NameIterator implements Iterator { - private final FileSystem fs; - private boolean fullPath; - private String name; - - NameIterator(FileSystem fs, boolean fullPath, String name) { - this.fs = fs; - this.fullPath = fullPath; - this.name = name; - } - - @Override - public boolean hasNext() { - return !isNullOrEmpty(name); - } - - @Override - public GcsPath next() { - int i = name.indexOf('/'); - String component; - if (i >= 0) { - component = name.substring(0, i); - name = name.substring(i + 1); - } else { - component = name; - name = null; - } - if (fullPath) { - fullPath = false; - return new GcsPath(fs, component, ""); - } else { - // Relative paths have no bucket. - return new GcsPath(fs, "", component); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - @Override - public int compareTo(Path other) { - if (!(other instanceof GcsPath)) { - throw new ClassCastException(); - } - - GcsPath path = (GcsPath) other; - int b = bucket.compareTo(path.bucket); - if (b != 0) { - return b; - } - - // Compare a component at a time, so that the separator char doesn't - // get compared against component contents. Eg, "a/b" < "a-1/b". - Iterator left = iterator(); - Iterator right = path.iterator(); - - while (left.hasNext() && right.hasNext()) { - String leftStr = left.next().toString(); - String rightStr = right.next().toString(); - int c = leftStr.compareTo(rightStr); - if (c != 0) { - return c; - } - } - - if (!left.hasNext() && !right.hasNext()) { - return 0; - } else { - return left.hasNext() ? 1 : -1; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - GcsPath paths = (GcsPath) o; - return bucket.equals(paths.bucket) && object.equals(paths.object); - } - - @Override - public int hashCode() { - int result = bucket.hashCode(); - result = 31 * result + object.hashCode(); - return result; - } - - @Override - public String toString() { - if (!isAbsolute()) { - return object; - } - StringBuilder sb = new StringBuilder(); - sb.append(SCHEME) - .append("://"); - if (!bucket.isEmpty()) { - sb.append(bucket) - .append('/'); - } - sb.append(object); - return sb.toString(); - } - - // TODO: Consider using resource names for all GCS paths used by the SDK. - public String toResourceName() { - StringBuilder sb = new StringBuilder(); - sb.append("storage.googleapis.com/"); - if (!bucket.isEmpty()) { - sb.append(bucket).append('/'); - } - sb.append(object); - return sb.toString(); - } - - @Override - public URI toUri() { - try { - return new URI(SCHEME, "//" + bucketAndObject(), null); - } catch (URISyntaxException e) { - throw new RuntimeException("Unable to create URI for GCS path " + this); - } - } - - private String bucketAndObject() { - if (bucket.isEmpty()) { - return object; - } else { - return bucket + "/" + object; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java deleted file mode 100644 index 4d49f8c..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Defines utilities used to interact with Google Cloud Storage. */ -package org.apache.beam.sdk.util.gcsfs; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java deleted file mode 100644 index f8135e7..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */ -package org.apache.beam.sdk.util; http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java deleted file mode 100644 index a8772c3..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam; - -import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; -import static org.hamcrest.MatcherAssert.assertThat; - -import com.google.common.collect.ImmutableSet; -import java.util.Set; -import org.apache.beam.sdk.util.ApiSurface; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** API surface verification for Google Cloud Platform core components. */ -@RunWith(JUnit4.class) -public class GcpCoreApiSurfaceTest { - - @Test - public void testApiSurface() throws Exception { - - @SuppressWarnings("unchecked") - final Set allowed = - ImmutableSet.of( - "org.apache.beam", - "com.google.api.client", - "com.google.api.services.storage", - "com.google.auth", - "com.fasterxml.jackson.annotation", - "com.fasterxml.jackson.core", - "com.fasterxml.jackson.databind", - "org.apache.avro", - "org.hamcrest", - // via DataflowMatchers - "org.codehaus.jackson", - // via Avro - "org.joda.time", - "org.junit"); - - assertThat( - ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java deleted file mode 100644 index 6f0846e..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/auth/TestCredential.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.gcp.auth; - -import com.google.auth.Credentials; -import java.io.IOException; -import java.net.URI; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - * Fake credential, for use in testing. - */ -public class TestCredential extends Credentials { - @Override - public String getAuthenticationType() { - return "Test"; - } - - @Override - public Map> getRequestMetadata() throws IOException { - return Collections.emptyMap(); - } - - @Override - public Map> getRequestMetadata(URI uri) throws IOException { - return Collections.emptyMap(); - } - - @Override - public boolean hasRequestMetadata() { - return false; - } - - @Override - public boolean hasRequestMetadataOnly() { - return true; - } - - @Override - public void refresh() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java deleted file mode 100644 index 68b3818..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.gcp.options; - -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects; -import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get; -import com.google.api.services.cloudresourcemanager.model.Project; -import com.google.api.services.storage.model.Bucket; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.RestoreSystemProperties; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestRule; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link GcpOptions}. */ -@RunWith(Enclosed.class) -public class GcpOptionsTest { - - /** Tests for the majority of methods. */ - @RunWith(JUnit4.class) - public static class Common { - @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGetProjectFromCloudSdkConfigEnv() throws Exception { - Map environment = - ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest(tmpFolder.newFile("properties"), environment)); - } - - @Test - public void testGetProjectFromAppDataEnv() throws Exception { - Map environment = - ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); - System.setProperty("os.name", "windows"); - assertEquals("test-project", - runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeEnvOld() throws Exception { - Map environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeEnv() throws Exception { - Map environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } - - @Test - public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { - Map environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - makePropertiesFileWithProject( - new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project"); - assertEquals("test-project", runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } - - @Test - public void testUnableToGetDefaultProject() throws Exception { - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.of()); - assertNull(projectFactory.create(PipelineOptionsFactory.create())); - } - - @Test - public void testEmptyGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setGcpCredential(new TestCredential()); - options.setProject(""); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("--project is a required option"); - options.getGcpTempLocation(); - } - - @Test - public void testDefaultGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://bucket"; - options.setTempLocation(tempLocation); - options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); - assertEquals(tempLocation, options.getGcpTempLocation()); - } - - @Test - public void testDefaultGcpTempLocationInvalid() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setTempLocation("file://"); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - options.getGcpTempLocation(); - } - - @Test - public void testDefaultGcpTempLocationDoesNotExist() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://does/not/exist"; - options.setTempLocation(tempLocation); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - thrown.expectCause( - hasMessage(containsString("Output path does not exist or is not writeable"))); - - options.getGcpTempLocation(); - } - - private static void makePropertiesFileWithProject(File path, String projectId) - throws IOException { - String properties = String.format("[core]%n" - + "account = test-account@google.com%n" - + "project = %s%n" - + "%n" - + "[dataflow]%n" - + "magic = true%n", projectId); - Files.write(properties, path, StandardCharsets.UTF_8); - } - - private static String runGetProjectTest(File path, Map environment) - throws Exception { - makePropertiesFileWithProject(path, "test-project"); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(environment); - return projectFactory.create(PipelineOptionsFactory.create()); - } - } - - /** Tests related to determining the GCP temp location. */ - @RunWith(JUnit4.class) - public static class GcpTempLocation { - @Rule public ExpectedException thrown = ExpectedException.none(); - @Mock private GcsUtil mockGcsUtil; - @Mock private CloudResourceManager mockCrmClient; - @Mock private Projects mockProjects; - @Mock private Get mockGet; - private Project fakeProject; - private PipelineOptions options; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - options = PipelineOptionsFactory.create(); - options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); - options.as(GcpOptions.class).setProject("foo"); - options.as(GcpOptions.class).setZone("us-north1-a"); - when(mockCrmClient.projects()).thenReturn(mockProjects); - when(mockProjects.get(any(String.class))).thenReturn(mockGet); - fakeProject = new Project().setProjectNumber(1L); - } - - @Test - public void testCreateBucket() throws Exception { - doReturn(fakeProject).when(mockGet).execute(); - when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L); - - String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - assertEquals("gs://dataflow-staging-us-north1-1", bucket); - } - - @Test - public void testCreateBucketProjectLookupFails() throws Exception { - doThrow(new IOException("badness")).when(mockGet).execute(); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to verify project"); - GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - } - - @Test - public void testCreateBucketCreateBucketFails() throws Exception { - doReturn(fakeProject).when(mockGet).execute(); - doThrow(new IOException("badness")).when( - mockGcsUtil).createBucket(any(String.class), any(Bucket.class)); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable create default bucket"); - GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - } - - @Test - public void testCannotGetBucketOwner() throws Exception { - doReturn(fakeProject).when(mockGet).execute(); - when(mockGcsUtil.bucketOwner(any(GcsPath.class))) - .thenThrow(new IOException("badness")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to determine the owner"); - GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - } - - @Test - public void testProjectMismatch() throws Exception { - doReturn(fakeProject).when(mockGet).execute(); - when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Bucket owner does not match the project"); - GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); - } - - @Test - public void regionFromZone() throws Exception { - assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a")); - assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a")); - } - } -}