beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [21/50] beam git commit: PackageUtil: parallelize staging of files
Date Sun, 29 Jan 2017 16:22:07 GMT
PackageUtil: parallelize staging of files

Proceeds in stages:
1. In parallel, hash and size all files.
2. Sort files by descending size.
3. In parallel, upload files.

Also a little cleanup for Dataflow 2.0:
* proper visibility
* removing some deprecated code
* refactoring into smaller methods.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ecf7e70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ecf7e70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ecf7e70

Branch: refs/heads/python-sdk
Commit: 3ecf7e70bcc4775d804f096de647d13c407a8d52
Parents: 979c937
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon Oct 24 17:27:23 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jan 25 11:03:03 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   5 +
 .../beam/runners/dataflow/util/GcsStager.java   |  18 +-
 .../beam/runners/dataflow/util/PackageUtil.java | 349 ++++++++++++-------
 .../runners/dataflow/util/PackageUtilTest.java  |  42 ++-
 .../org/apache/beam/sdk/options/GcsOptions.java |   4 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 +
 6 files changed, 281 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index eea5502..9858b3d 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -203,6 +203,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-storage</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.auth</groupId>
       <artifactId>google-auth-library-credentials</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 6ca4c3f..53822e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -17,13 +17,19 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.storage.Storage;
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
+import org.apache.beam.sdk.util.Transport;
 
 /**
  * Utility class for staging files to GCS.
@@ -35,6 +41,7 @@ public class GcsStager implements Stager {
     this.options = options;
   }
 
+  @SuppressWarnings("unused")  // used via reflection
   public static GcsStager fromOptions(PipelineOptions options) {
     return new GcsStager(options.as(DataflowPipelineOptions.class));
   }
@@ -48,7 +55,16 @@ public class GcsStager implements Stager {
     if (windmillBinary != null) {
       filesToStage.add("windmill_main=" + windmillBinary);
     }
+    int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
+    checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
+    uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
+    Storage.Builder storageBuilder = Transport.newStorageClient(options);
+    GcsUtil util = GcsUtilFactory.create(
+        storageBuilder.build(),
+        storageBuilder.getHttpRequestInitializer(),
+        options.getExecutorService(),
+        uploadSizeBytes);
     return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(), options.getStagingLocation());
+        options.getFilesToStage(), options.getStagingLocation(), util);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 6d910ba..fa8c94d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -17,53 +17,62 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.core.Base64Variants;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.collect.Lists;
 import com.google.common.hash.Funnels;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.io.CountingOutputStream;
 import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.ZipFiles;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Helper routines for packages. */
-public class PackageUtil {
+class PackageUtil {
   private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
   /**
    * A reasonable upper bound on the number of jars required to launch a Dataflow job.
    */
-  public static final int SANE_CLASSPATH_SIZE = 1000;
-  /**
-   * The initial interval to use between package staging attempts.
-   */
-  private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5);
-  /**
-   * The maximum number of retries when staging a file.
-   */
-  private static final int MAX_RETRIES = 4;
+  private static final int SANE_CLASSPATH_SIZE = 1000;
 
   private static final FluentBackoff BACKOFF_FACTORY =
-      FluentBackoff.DEFAULT
-          .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL);
+      FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
 
   /**
    * Translates exceptions from API calls.
@@ -71,35 +80,18 @@ public class PackageUtil {
   private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
 
   /**
-   * Creates a DataflowPackage containing information about how a classpath element should
be
-   * staged, including the staging destination as well as its size and hash.
-   *
-   * @param classpathElement The local path for the classpath element.
-   * @param stagingPath The base location for staged classpath elements.
-   * @param overridePackageName If non-null, use the given value as the package name
-   *                            instead of generating one automatically.
-   * @return The package.
-   */
-  @Deprecated
-  public static DataflowPackage createPackage(File classpathElement,
-      String stagingPath, String overridePackageName) {
-    return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
-        .getDataflowPackage();
-  }
-
-  /**
    * Compute and cache the attributes of a classpath element that we will need to stage it.
    *
-   * @param classpathElement the file or directory to be staged.
+   * @param source the file or directory to be staged.
    * @param stagingPath The base location for staged classpath elements.
    * @param overridePackageName If non-null, use the given value as the package name
    *                            instead of generating one automatically.
    * @return a {@link PackageAttributes} that containing metadata about the object to be
staged.
    */
-  static PackageAttributes createPackageAttributes(File classpathElement,
-      String stagingPath, String overridePackageName) {
+  static PackageAttributes createPackageAttributes(File source,
+      String stagingPath, @Nullable String overridePackageName) {
     try {
-      boolean directory = classpathElement.isDirectory();
+      boolean directory = source.isDirectory();
 
       // Compute size and hash in one pass over file or directory.
       Hasher hasher = Hashing.md5().newHasher();
@@ -108,142 +100,232 @@ public class PackageUtil {
 
       if (!directory) {
         // Files are staged as-is.
-        Files.asByteSource(classpathElement).copyTo(countingOutputStream);
+        Files.asByteSource(source).copyTo(countingOutputStream);
       } else {
         // Directories are recursively zipped.
-        ZipFiles.zipDirectory(classpathElement, countingOutputStream);
+        ZipFiles.zipDirectory(source, countingOutputStream);
       }
 
       long size = countingOutputStream.getCount();
       String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
 
       // Create the DataflowPackage with staging name and location.
-      String uniqueName = getUniqueContentName(classpathElement, hash);
+      String uniqueName = getUniqueContentName(source, hash);
       String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
       DataflowPackage target = new DataflowPackage();
       target.setName(overridePackageName != null ? overridePackageName : uniqueName);
       target.setLocation(resourcePath);
 
-      return new PackageAttributes(size, hash, directory, target);
+      return new PackageAttributes(size, hash, directory, target, source.getPath());
     } catch (IOException e) {
-      throw new RuntimeException("Package setup failure for " + classpathElement, e);
+      throw new RuntimeException("Package setup failure for " + source, e);
     }
   }
 
-  /**
-   * Transfers the classpath elements to the staging location.
-   *
-   * @param classpathElements The elements to stage.
-   * @param stagingPath The base location to stage the elements to.
-   * @return A list of cloud workflow packages, each representing a classpath element.
-   */
-  public static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath) {
-    return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
-  }
-
-  // Visible for testing.
-  static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath,
-      Sleeper retrySleeper) {
-    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to
"
-        + "prepare for execution.", classpathElements.size());
-
-    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
-      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically
"
-          + "copies to all workers. Having this many entries on your classpath may be indicative
"
-          + "of an issue in your pipeline. You may want to consider trimming the classpath
to "
-          + "necessary dependencies only, using --filesToStage pipeline option to override
"
-          + "what files are being staged, or bundling several dependencies into one.",
-          classpathElements.size());
-    }
-
-    ArrayList<DataflowPackage> packages = new ArrayList<>();
+  /** Utility comparator used in uploading packages efficiently. */
+  private static class PackageUploadOrder implements Comparator<PackageAttributes>
{
+    @Override
+    public int compare(PackageAttributes o1, PackageAttributes o2) {
+      // Smaller size compares high so that bigger packages are uploaded first.
+      long sizeDiff = o2.getSize() - o1.getSize();
+      if (sizeDiff != 0) {
+        // returns sign of long
+        return Long.signum(sizeDiff);
+      }
 
-    if (stagingPath == null) {
-      throw new IllegalArgumentException(
-          "Can't stage classpath elements on because no staging location has been provided");
+      // Otherwise, choose arbitrarily based on hash.
+      return o1.getHash().compareTo(o2.getHash());
     }
+  }
 
-    int numUploaded = 0;
-    int numCached = 0;
+  /**
+   * Utility function that computes sizes and hashes of packages so that we can validate
whether
+   * they have already been correctly staged.
+   */
+  private static List<PackageAttributes> computePackageAttributes(
+      Collection<String> classpathElements, final String stagingPath,
+      ListeningExecutorService executorService) {
+    List<ListenableFuture<PackageAttributes>> futures = new LinkedList<>();
     for (String classpathElement : classpathElements) {
-      String packageName = null;
+      @Nullable String userPackageName = null;
       if (classpathElement.contains("=")) {
         String[] components = classpathElement.split("=", 2);
-        packageName = components[0];
+        userPackageName = components[0];
         classpathElement = components[1];
       }
+      @Nullable final String packageName = userPackageName;
 
-      File file = new File(classpathElement);
+      final File file = new File(classpathElement);
       if (!file.exists()) {
         LOG.warn("Skipping non-existent classpath element {} that was specified.",
             classpathElement);
         continue;
       }
 
-      PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
+      ListenableFuture<PackageAttributes> future =
+          executorService.submit(new Callable<PackageAttributes>() {
+            @Override
+            public PackageAttributes call() throws Exception {
+              return createPackageAttributes(file, stagingPath, packageName);
+            }
+          });
+      futures.add(future);
+    }
+
+    try {
+      return Futures.allAsList(futures).get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while staging packages", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Error while staging packages", e.getCause());
+    }
+  }
+
+  private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
+      throws IOException {
+    IOChannelFactory factory = IOChannelUtils.getFactory(target);
+    if (factory instanceof GcsIOChannelFactory) {
+      return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
+    } else {
+      return factory.create(target, MimeTypes.BINARY);
+    }
+  }
 
-      DataflowPackage workflowPackage = attributes.getDataflowPackage();
-      packages.add(workflowPackage);
-      String target = workflowPackage.getLocation();
+  /**
+   * Utility to verify whether a package has already been staged and, if not, copy it to
the
+   * staging location.
+   */
+  private static void stageOnePackage(
+      PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
+      Sleeper retrySleeper, GcsUtil gcsUtil) {
+    String source = attributes.getSourcePath();
+    String target = attributes.getDataflowPackage().getLocation();
 
-      // TODO: Should we attempt to detect the Mime type rather than
-      // always using MimeTypes.BINARY?
+    // TODO: Should we attempt to detect the Mime type rather than
+    // always using MimeTypes.BINARY?
+    try {
       try {
-        try {
-          long remoteLength = IOChannelUtils.getSizeBytes(target);
-          if (remoteLength == attributes.getSize()) {
-            LOG.debug("Skipping classpath element already staged: {} at {}",
-                classpathElement, target);
-            numCached++;
-            continue;
-          }
-        } catch (FileNotFoundException expected) {
-          // If the file doesn't exist, it means we need to upload it.
+        long remoteLength = IOChannelUtils.getSizeBytes(target);
+        if (remoteLength == attributes.getSize()) {
+          LOG.debug("Skipping classpath element already staged: {} at {}",
+              attributes.getSourcePath(), target);
+          numCached.incrementAndGet();
+          return;
         }
+      } catch (FileNotFoundException expected) {
+        // If the file doesn't exist, it means we need to upload it.
+      }
 
-        // Upload file, retrying on failure.
-        BackOff backoff = BACKOFF_FACTORY.backoff();
-        while (true) {
-          try {
-            LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
-            try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY))
{
-              copyContent(classpathElement, writer);
-            }
-            numUploaded++;
-            break;
-          } catch (IOException e) {
-            if (ERROR_EXTRACTOR.accessDenied(e)) {
-              String errorMessage = String.format(
-                  "Uploaded failed due to permissions error, will NOT retry staging "
-                  + "of classpath %s. Please verify credentials are valid and that you have
"
-                  + "write access to %s. Stale credentials can be resolved by executing "
-                  + "'gcloud auth login'.", classpathElement, target);
-              LOG.error(errorMessage);
-              throw new IOException(errorMessage, e);
-            }
-            long sleep = backoff.nextBackOffMillis();
-            if (sleep == BackOff.STOP) {
-              // Rethrow last error, to be included as a cause in the catch below.
-              LOG.error("Upload failed, will NOT retry staging of classpath: {}",
-                  classpathElement, e);
-              throw e;
-            } else {
-              LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath:
{}",
-                  classpathElement, e);
-              retrySleeper.sleep(sleep);
-            }
+      // Upload file, retrying on failure.
+      BackOff backoff = BACKOFF_FACTORY.backoff();
+      while (true) {
+        try {
+          LOG.debug("Uploading classpath element {} to {}", source, target);
+          try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
+            copyContent(source, writer);
+          }
+          numUploaded.incrementAndGet();
+          break;
+        } catch (IOException e) {
+          if (ERROR_EXTRACTOR.accessDenied(e)) {
+            String errorMessage = String.format(
+                "Uploaded failed due to permissions error, will NOT retry staging "
+                    + "of classpath %s. Please verify credentials are valid and that you
have "
+                    + "write access to %s. Stale credentials can be resolved by executing
"
+                    + "'gcloud auth application-default login'.", source, target);
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage, e);
+          }
+          long sleep = backoff.nextBackOffMillis();
+          if (sleep == BackOff.STOP) {
+            // Rethrow last error, to be included as a cause in the catch below.
+            LOG.error("Upload failed, will NOT retry staging of classpath: {}",
+                source, e);
+            throw e;
+          } else {
+            LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath:
{}",
+                source, e);
+            retrySleeper.sleep(sleep);
           }
         }
-      } catch (Exception e) {
-        throw new RuntimeException("Could not stage classpath element: " + classpathElement,
e);
       }
+    } catch (Exception e) {
+      throw new RuntimeException("Could not stage classpath element: " + source, e);
     }
+  }
 
-    LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
-        + "{} files cached",
-        numUploaded, numCached);
+  /**
+   * Transfers the classpath elements to the staging location.
+   *
+   * @param classpathElements The elements to stage.
+   * @param stagingPath The base location to stage the elements to.
+   * @return A list of cloud workflow packages, each representing a classpath element.
+   */
+  static List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) {
+    ListeningExecutorService executorService =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
+    try {
+      return stageClasspathElements(
+          classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil);
+    } finally {
+      executorService.shutdown();
+    }
+  }
+
+  // Visible for testing.
+  static List<DataflowPackage> stageClasspathElements(
+      Collection<String> classpathElements, final String stagingPath,
+      final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil
gcsUtil) {
+    LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to
"
+        + "prepare for execution.", classpathElements.size());
+
+    if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
+      LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically
"
+            + "copies to all workers. Having this many entries on your classpath may be indicative
"
+            + "of an issue in your pipeline. You may want to consider trimming the classpath
to "
+            + "necessary dependencies only, using --filesToStage pipeline option to override
"
+            + "what files are being staged, or bundling several dependencies into one.",
+          classpathElements.size());
+    }
+
+    checkArgument(
+        stagingPath != null,
+        "Can't stage classpath elements because no staging location has been provided");
+
+    // Inline a copy here because the inner code returns an immutable list and we want to
mutate it.
+    List<PackageAttributes> packageAttributes =
+        new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService));
+    // Order package attributes in descending size order so that we upload the largest files
first.
+    Collections.sort(packageAttributes, new PackageUploadOrder());
+
+    List<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
+    final AtomicInteger numUploaded = new AtomicInteger(0);
+    final AtomicInteger numCached = new AtomicInteger(0);
+
+    List<ListenableFuture<?>> futures = new LinkedList<>();
+    for (final PackageAttributes attributes : packageAttributes) {
+      packages.add(attributes.getDataflowPackage());
+      futures.add(executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil);
+        }
+      }));
+    }
+    try {
+      Futures.allAsList(futures).get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while staging packages", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Error while staging packages", e.getCause());
+    }
+
+    LOG.info(
+        "Staging files complete: {} files cached, {} files newly uploaded",
+        numUploaded.get(), numCached.get());
 
     return packages;
   }
@@ -293,13 +375,15 @@ public class PackageUtil {
     private final boolean directory;
     private final long size;
     private final String hash;
+    private final String sourcePath;
     private DataflowPackage dataflowPackage;
 
     public PackageAttributes(long size, String hash, boolean directory,
-        DataflowPackage dataflowPackage) {
+        DataflowPackage dataflowPackage, String sourcePath) {
       this.size = size;
       this.hash = Objects.requireNonNull(hash, "hash");
       this.directory = directory;
+      this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
       this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
     }
 
@@ -330,5 +414,12 @@ public class PackageUtil {
     public String getHash() {
       return hash;
     }
+
+    /**
+     * @return the file to be uploaded
+     */
+    public String getSourcePath() {
+      return sourcePath;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 05a87dd..3828415 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -18,12 +18,12 @@
 package org.apache.beam.runners.dataflow.util;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -53,6 +53,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -235,7 +236,7 @@ public class PackageUtilTest {
       classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
     }
 
-    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH);
+    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil);
 
     logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
   }
@@ -250,7 +251,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -277,7 +278,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -304,7 +305,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -327,7 +328,8 @@ public class PackageUtilTest {
     try {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper);
+          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+          mockGcsUtil);
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
@@ -348,16 +350,20 @@ public class PackageUtilTest {
     try {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
-          STAGING_PATH, fastNanoClockAndSleeper);
+          STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
+          mockGcsUtil);
       fail("Expected RuntimeException");
     } catch (RuntimeException e) {
-      assertTrue("Expected IOException containing detailed message.",
-          e.getCause() instanceof IOException);
-      assertThat(e.getCause().getMessage(),
+      assertThat("Expected RuntimeException wrapping IOException.",
+          e.getCause(), instanceOf(RuntimeException.class));
+      assertThat("Expected IOException containing detailed message.",
+          e.getCause().getCause(), instanceOf(IOException.class));
+      assertThat(e.getCause().getCause().getMessage(),
           Matchers.allOf(
               Matchers.containsString("Uploaded failed due to permissions error"),
               Matchers.containsString(
-                  "Stale credentials can be resolved by executing 'gcloud auth login'")));
+                  "Stale credentials can be resolved by executing 'gcloud auth application-default
"
+                      + "login'")));
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -377,9 +383,8 @@ public class PackageUtilTest {
 
     try {
       PackageUtil.stageClasspathElements(
-                                              ImmutableList.of(tmpFile.getAbsolutePath()),
-                                              STAGING_PATH,
-                                              fastNanoClockAndSleeper);
+          ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
+          MoreExecutors.newDirectExecutorService(), mockGcsUtil);
     } finally {
       verify(mockGcsUtil).fileSize(any(GcsPath.class));
       verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
@@ -393,7 +398,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verifyNoMoreInteractions(mockGcsUtil);
@@ -411,7 +416,7 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
@@ -429,7 +434,8 @@ public class PackageUtilTest {
     when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH);
+        ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
+        mockGcsUtil);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
     verify(mockGcsUtil).fileSize(any(GcsPath.class));
@@ -446,7 +452,7 @@ public class PackageUtilTest {
     String nonExistentFile =
         IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
     assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
-        ImmutableList.of(nonExistentFile), STAGING_PATH));
+        ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index 0553efc..72e106d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.AppEngineEnvironment;
 import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -81,8 +82,9 @@ public interface GcsOptions extends
       + "information on the restrictions and performance implications of this value.\n\n"
       + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
       + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
+  @Nullable
   Integer getGcsUploadBufferSizeBytes();
-  void setGcsUploadBufferSizeBytes(Integer bytes);
+  void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
 
   /**
    * The class of the validator that should be created and used to validate paths.

http://git-wip-us.apache.org/repos/asf/beam/blob/3ecf7e70/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index a10ea28..5e83584 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -101,6 +101,18 @@ public class GcsUtil {
           gcsOptions.getExecutorService(),
           gcsOptions.getGcsUploadBufferSizeBytes());
     }
+
+    /**
+     * Returns an instance of {@link GcsUtil} based on the given parameters.
+     */
+    public static GcsUtil create(
+        Storage storageClient,
+        HttpRequestInitializer httpRequestInitializer,
+        ExecutorService executorService,
+        @Nullable Integer uploadBufferSizeBytes) {
+      return new GcsUtil(
+          storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes);
+    }
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class);


Mime
View raw message