beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/7] beam git commit: [BEAM-2135] Move gcp-core to google-cloud-platform-core
Date Tue, 02 May 2017 17:57:06 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9114eb3ee -> 4c3174b80


http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
new file mode 100644
index 0000000..6ffcaeb
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.client.util.BackOff;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
+import com.google.cloud.hadoop.util.ClientRequestHelper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcsUtil}. */
+@RunWith(JUnit4.class)
+public class GcsUtilTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGlobTranslation() {
+    assertEquals("foo", GcsUtil.globToRegexp("foo"));
+    assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
+    assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
+    assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
+  }
+
+  private static GcsOptions gcsOptionsWithTestCredential() {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    return pipelineOptions;
+  }
+
+  @Test
+  public void testCreationWithDefaultOptions() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    assertNotNull(pipelineOptions.getGcpCredential());
+  }
+
+  @Test
+  public void testUploadBufferSizeDefault() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil util = pipelineOptions.getGcsUtil();
+    assertNull(util.getUploadBufferSizeBytes());
+  }
+
+  @Test
+  public void testUploadBufferSizeUserSpecified() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    pipelineOptions.setGcsUploadBufferSizeBytes(12345);
+    GcsUtil util = pipelineOptions.getGcsUtil();
+    assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
+  }
+
+  @Test
+  public void testCreationWithExecutorServiceProvided() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    pipelineOptions.setExecutorService(Executors.newCachedThreadPool());
+    assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService);
+  }
+
+  @Test
+  public void testCreationWithGcsUtilProvided() {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    GcsUtil gcsUtil = Mockito.mock(GcsUtil.class);
+    pipelineOptions.setGcsUtil(gcsUtil);
+    assertSame(gcsUtil, pipelineOptions.getGcsUtil());
+  }
+
+  @Test
+  public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    ExecutorService executorService = pipelineOptions.getExecutorService();
+
+    int numThreads = 100;
+    final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      final int currentLatch = i;
+      countDownLatches[i] = new CountDownLatch(1);
+      executorService.execute(
+          new Runnable() {
+            @Override
+            public void run() {
+              // Wait for latch N and then release latch N - 1
+              try {
+                countDownLatches[currentLatch].await();
+                if (currentLatch > 0) {
+                  countDownLatches[currentLatch - 1].countDown();
+                }
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    }
+
+    // Release the last latch starting the chain reaction.
+    countDownLatches[countDownLatches.length - 1].countDown();
+    executorService.shutdown();
+    assertTrue("Expected tasks to complete",
+        executorService.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testGlobExpansion() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+    Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
+
+    Objects modelObjects = new Objects();
+    List<StorageObject> items = new ArrayList<>();
+    // A directory
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+    // Files within the directory
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name"));
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name"));
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name"));
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile"));
+
+    modelObjects.setItems(items);
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn(
+        mockStorageGet);
+    when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
+    when(mockStorageGet.execute()).thenReturn(
+        new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
+    when(mockStorageList.execute()).thenReturn(modelObjects);
+
+    // Test a single file.
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+      List<GcsPath> expectedFiles =
+          ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    // Test patterns.
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+  }
+
+  // Patterns that contain recursive wildcards ('**') are not supported.
+  @Test
+  public void testRecursiveGlobExpansionFails() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+    GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unsupported wildcard usage");
+    gcsUtil.expand(pattern);
+  }
+
+  // GCSUtil.expand() should fail when matching a single object when that object does not exist.
+  // We should return the empty result since GCS get object is strongly consistent.
+  @Test
+  public void testNonExistentObjectReturnsEmptyResult() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile");
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
+        mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern));
+  }
+
+  // GCSUtil.expand() should fail for other errors such as access denied.
+  @Test
+  public void testAccessDeniedObjectThrowsIOException() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
+        mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Unable to get the file object for path");
+    gcsUtil.expand(pattern);
+  }
+
+  @Test
+  public void testFileSizeNonBatch() throws Exception {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenReturn(
+            new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
+  }
+
+  @Test
+  public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
+    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
+    notFoundResponse.setContent("");
+    notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+    MockHttpTransport mockTransport =
+            new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+
+    thrown.expect(FileNotFoundException.class);
+    gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
+  }
+
+  @Test
+  public void testRetryFileSizeNonBatch() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000,
+        gcsUtil.getObject(
+            GcsPath.fromComponents("testbucket", "testobject"),
+            mockBackOff,
+            new FastNanoClockAndSleeper()).getSize().longValue());
+    assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
+    JsonFactory jsonFactory = new JacksonFactory();
+
+    String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
+
+    GenericJson error = new GenericJson()
+        .set("error", new GenericJson().set("code", 404));
+    error.setFactory(jsonFactory);
+
+    String content = contentBoundaryLine + "\n"
+        + "Content-Type: application/http\n"
+        + "\n"
+        + "HTTP/1.1 404 Not Found\n"
+        + "Content-Length: -1\n"
+        + "\n"
+        + error.toString()
+        + "\n"
+        + "\n"
+        + endOfContentBoundaryLine
+        + "\n";
+    thrown.expect(FileNotFoundException.class);
+    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse()
+        .setContentType("multipart/mixed; boundary=" + contentBoundary)
+        .setContent(content)
+        .setStatusCode(HttpStatusCodes.STATUS_CODE_OK);
+
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
+    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
+    JsonFactory jsonFactory = new JacksonFactory();
+
+    String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
+
+    GenericJson error = new GenericJson()
+        .set("error", new GenericJson().set("code", 404));
+    error.setFactory(jsonFactory);
+
+    String content = contentBoundaryLine + "\n"
+        + "Content-Type: application/http\n"
+        + "\n"
+        + "HTTP/1.1 404 Not Found\n"
+        + "Content-Length: -1\n"
+        + "\n"
+        + error.toString()
+        + "\n"
+        + "\n"
+        + endOfContentBoundaryLine
+        + "\n";
+    thrown.expect(FileNotFoundException.class);
+
+    final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
+    when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
+
+    // 429: Too many requests, then 200: OK.
+    when(mockResponse.getStatusCode()).thenReturn(429, 200);
+    when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return mockResponse;
+                  }
+                })
+            .build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+        gcsUtil.setStorageClient(
+        new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
+    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
+  }
+
+  @Test
+  public void testCreateBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testCreateBucketAccessErrors() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(expectedException);
+
+    thrown.expect(AccessDeniedException.class);
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testBucketAccessible() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(expectedException);
+
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testBucketDoesNotExist() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see"));
+
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testGetBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testGetBucketNotExists() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see"));
+
+    thrown.expect(FileNotFoundException.class);
+    thrown.expectMessage("It don't exist");
+    gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
+                      mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testGCSChannelCloseIdempotent() throws IOException {
+    SeekableByteChannel channel =
+        new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,
+        new ClientRequestHelper<StorageObject>());
+    channel.close();
+    channel.close();
+  }
+
+  /**
+   * Builds a fake GoogleJsonResponseException for testing API error handling.
+   */
+  private static GoogleJsonResponseException googleJsonResponseException(
+      final int status, final String reason, final String message) throws IOException {
+    final JsonFactory jsonFactory = new JacksonFactory();
+    HttpTransport transport = new MockHttpTransport() {
+      @Override
+      public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
+        ErrorInfo errorInfo = new ErrorInfo();
+        errorInfo.setReason(reason);
+        errorInfo.setMessage(message);
+        errorInfo.setFactory(jsonFactory);
+        GenericJson error = new GenericJson();
+        error.set("code", status);
+        error.set("errors", Arrays.asList(errorInfo));
+        error.setFactory(jsonFactory);
+        GenericJson errorResponse = new GenericJson();
+        errorResponse.set("error", error);
+        errorResponse.setFactory(jsonFactory);
+        return new MockLowLevelHttpRequest().setResponse(
+            new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+        }
+    };
+    HttpRequest request =
+        transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+    request.setThrowExceptionOnExecuteError(false);
+    HttpResponse response = request.execute();
+    return GoogleJsonResponseException.from(jsonFactory, response);
+  }
+
+  private static List<String> makeStrings(String s, int n) {
+    ImmutableList.Builder<String> ret = ImmutableList.builder();
+    for (int i = 0; i < n; ++i) {
+      ret.add(String.format("gs://bucket/%s%d", s, i));
+    }
+    return ret.build();
+  }
+
+  private static List<GcsPath> makeGcsPaths(String s, int n) {
+    ImmutableList.Builder<GcsPath> ret = ImmutableList.builder();
+    for (int i = 0; i < n; ++i) {
+      ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i)));
+    }
+    return ret.build();
+  }
+
+  private static int sumBatchSizes(List<BatchRequest> batches) {
+    int ret = 0;
+    for (BatchRequest b : batches) {
+      ret += b.size();
+      assertThat(b.size(), greaterThan(0));
+    }
+    return ret;
+  }
+
+  @Test
+  public void testMakeCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
+
+  @Test
+  public void testInvalidCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Number of source files 3");
+
+    gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
+  }
+
+  @Test
+  public void testMakeRemoveBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
+
+  @Test
+  public void testMakeGetBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<StorageObjectOrIOException[]> results = Lists.newArrayList();
+    List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results);
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+    assertEquals(3, results.size());
+
+    // 1 batch of files fits in 1 batch
+    results = Lists.newArrayList();
+    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+    assertEquals(100, results.size());
+
+    // A little more than 5 batches of files fits in 6 batches
+    results = Lists.newArrayList();
+    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+    assertEquals(501, results.size());
+  }
+
+  /**
+   * A helper to wrap a {@link GenericJson} object in a content stream.
+   */
+  private static InputStream toStream(String content) throws IOException {
+    return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
new file mode 100644
index 0000000..37551a4
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.HttpResponseInterceptor;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.Storage.Objects.Get;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.security.PrivateKey;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for RetryHttpRequestInitializer.
+ */
+@RunWith(JUnit4.class)
+public class RetryHttpRequestInitializerTest {
+
+  @Mock private PrivateKey mockPrivateKey;
+  @Mock private LowLevelHttpRequest mockLowLevelRequest;
+  @Mock private LowLevelHttpResponse mockLowLevelResponse;
+  @Mock private HttpResponseInterceptor mockHttpResponseInterceptor;
+
+  private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+  private Storage storage;
+
+  // Used to test retrying a request more than the default 10 times.
+  static class MockNanoClock implements NanoClock {
+    private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543,
+        12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420};
+    private int i = 0;
+
+    @Override
+    public long nanoTime() {
+      return timesMs[i++ / 2] * 1000000;
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    HttpTransport lowLevelTransport = new HttpTransport() {
+      @Override
+      protected LowLevelHttpRequest buildRequest(String method, String url)
+          throws IOException {
+        return mockLowLevelRequest;
+      }
+    };
+
+    // Retry initializer will pass through to credential, since we can have
+    // only a single HttpRequestInitializer, and we use multiple Credential
+    // types in the SDK, not all of which allow for retry configuration.
+    RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer(
+        new MockNanoClock(), new Sleeper() {
+          @Override
+          public void sleep(long millis) throws InterruptedException {}
+        }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor);
+    storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer)
+        .setApplicationName("test").build();
+  }
+
+  @After
+  public void tearDown() {
+    verifyNoMoreInteractions(mockPrivateKey);
+    verifyNoMoreInteractions(mockLowLevelRequest);
+    verifyNoMoreInteractions(mockHttpResponseInterceptor);
+  }
+
+  @Test
+  public void testBasicOperation() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a non-retriable error is not retried.
+   */
+  @Test
+  public void testErrorCodeForbidden() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(403)  // Non-retryable error.
+        .thenReturn(200); // Shouldn't happen.
+
+    try {
+      Storage.Buckets.Get result = storage.buckets().get("test");
+      HttpResponse response = result.executeUnparsed();
+      assertNotNull(response);
+    } catch (HttpResponseException e) {
+      Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
+    }
+
+    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a retriable error is retried.
+   */
+  @Test
+  public void testRetryableError() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse)
+        .thenReturn(mockLowLevelResponse)
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(503)  // Retryable
+        .thenReturn(429)  // We also retry on 429 Too Many Requests.
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(3)).execute();
+    verify(mockLowLevelResponse, times(3)).getStatusCode();
+  }
+
+  /**
+   * Tests that an IOException is retried.
+   */
+  @Test
+  public void testThrowIOException() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenThrow(new IOException("Fake Error"))
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(2)).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a retryable error is retried enough times.
+   */
+  @Test
+  public void testRetryableErrorRetryEnoughTimes() throws IOException {
+    when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse);
+    final int retries = 10;
+    when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){
+      int n = 0;
+      @Override
+      public Integer answer(InvocationOnMock invocation) {
+        return (n++ < retries - 1) ? 503 : 200;
+      }});
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
+        anyString());
+    verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(retries)).execute();
+    verify(mockLowLevelResponse, times(retries)).getStatusCode();
+  }
+
+  /**
+   * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler
+   * is invoked.
+   */
+  @Test
+  public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception {
+    // Counts the number of calls to execute the HTTP request.
+    final AtomicLong executeCount = new AtomicLong();
+
+    // 10 is a private internal constant in the Google API Client library. See
+    // com.google.api.client.http.HttpRequest#setNumberOfRetries
+    // TODO: update this test once the private internal constant is public.
+    final int defaultNumberOfRetries = 10;
+
+    // A mock HTTP request that always throws SocketTimeoutException.
+    MockHttpTransport transport =
+        new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() {
+          @Override
+          public LowLevelHttpResponse execute() throws IOException {
+            executeCount.incrementAndGet();
+            throw new SocketTimeoutException("Fake forced timeout exception");
+          }
+        }).build();
+
+    // A sample HTTP request to Google Cloud Storage that uses both default Transport and default
+    // RetryHttpInitializer.
+    Storage storage = new Storage.Builder(
+        transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
+
+    Get getRequest = storage.objects().get("gs://fake", "file");
+
+    try {
+      getRequest.execute();
+      fail();
+    } catch (Throwable e) {
+      assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
+      assertEquals(1 + defaultNumberOfRetries, executeCount.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
new file mode 100644
index 0000000..426fb16
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.gcsfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of GcsPath.
+ */
+@RunWith(JUnit4.class)
+public class GcsPathTest {
+
+  /**
+   * Test case, which tests parsing and building of GcsPaths.
+   */
+  static final class TestCase {
+
+    final String uri;
+    final String expectedBucket;
+    final String expectedObject;
+    final String[] namedComponents;
+
+    TestCase(String uri, String... namedComponents) {
+      this.uri = uri;
+      this.expectedBucket = namedComponents[0];
+      this.namedComponents = namedComponents;
+      this.expectedObject = uri.substring(expectedBucket.length() + 6);
+    }
+  }
+
+  // Each test case is an expected URL, then the components used to build it.
+  // Empty components result in a double slash.
+  static final List<TestCase> PATH_TEST_CASES = Arrays.asList(
+      new TestCase("gs://bucket/then/object", "bucket", "then", "object"),
+      new TestCase("gs://bucket//then/object", "bucket", "", "then", "object"),
+      new TestCase("gs://bucket/then//object", "bucket", "then", "", "object"),
+      new TestCase("gs://bucket/then///object", "bucket", "then", "", "", "object"),
+      new TestCase("gs://bucket/then/object/", "bucket", "then", "object/"),
+      new TestCase("gs://bucket/then/object/", "bucket", "then/", "object/"),
+      new TestCase("gs://bucket/then/object//", "bucket", "then", "object", ""),
+      new TestCase("gs://bucket/then/object//", "bucket", "then", "object/", ""),
+      new TestCase("gs://bucket/", "bucket")
+  );
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGcsPathParsing() throws Exception {
+    for (TestCase testCase : PATH_TEST_CASES) {
+      String uriString = testCase.uri;
+
+      GcsPath path = GcsPath.fromUri(URI.create(uriString));
+      // Deconstruction - check bucket, object, and components.
+      assertEquals(testCase.expectedBucket, path.getBucket());
+      assertEquals(testCase.expectedObject, path.getObject());
+      assertEquals(testCase.uri,
+          testCase.namedComponents.length, path.getNameCount());
+
+      // Construction - check that the path can be built from components.
+      GcsPath built = GcsPath.fromComponents(null, null);
+      for (String component : testCase.namedComponents) {
+        built = built.resolve(component);
+      }
+      assertEquals(testCase.uri, built.toString());
+    }
+  }
+
+  @Test
+  public void testParentRelationship() throws Exception {
+    GcsPath path = GcsPath.fromComponents("bucket", "then/object");
+    assertEquals("bucket", path.getBucket());
+    assertEquals("then/object", path.getObject());
+    assertEquals(3, path.getNameCount());
+    assertTrue(path.endsWith("object"));
+    assertTrue(path.startsWith("bucket/then"));
+
+    GcsPath parent = path.getParent();  // gs://bucket/then/
+    assertEquals("bucket", parent.getBucket());
+    assertEquals("then/", parent.getObject());
+    assertEquals(2, parent.getNameCount());
+    assertThat(path, Matchers.not(Matchers.equalTo(parent)));
+    assertTrue(path.startsWith(parent));
+    assertFalse(parent.startsWith(path));
+    assertTrue(parent.endsWith("then/"));
+    assertTrue(parent.startsWith("bucket/then"));
+    assertTrue(parent.isAbsolute());
+
+    GcsPath root = path.getRoot();
+    assertEquals(0, root.getNameCount());
+    assertEquals("gs://", root.toString());
+    assertEquals("", root.getBucket());
+    assertEquals("", root.getObject());
+    assertTrue(root.isAbsolute());
+    assertThat(root, Matchers.equalTo(parent.getRoot()));
+
+    GcsPath grandParent = parent.getParent();  // gs://bucket/
+    assertEquals(1, grandParent.getNameCount());
+    assertEquals("gs://bucket/", grandParent.toString());
+    assertTrue(grandParent.isAbsolute());
+    assertThat(root, Matchers.equalTo(grandParent.getParent()));
+    assertThat(root.getParent(), Matchers.nullValue());
+
+    assertTrue(path.startsWith(path.getRoot()));
+    assertTrue(parent.startsWith(path.getRoot()));
+  }
+
+  @Test
+  public void testRelativeParent() throws Exception {
+    GcsPath path = GcsPath.fromComponents(null, "a/b");
+    GcsPath parent = path.getParent();
+    assertEquals("a/", parent.toString());
+
+    GcsPath grandParent = parent.getParent();
+    assertNull(grandParent);
+  }
+
+  @Test
+  public void testUriSupport() throws Exception {
+    URI uri = URI.create("gs://bucket/some/path");
+
+    GcsPath path = GcsPath.fromUri(uri);
+    assertEquals("bucket", path.getBucket());
+    assertEquals("some/path", path.getObject());
+
+    URI reconstructed = path.toUri();
+    assertEquals(uri, reconstructed);
+
+    path = GcsPath.fromUri("gs://bucket");
+    assertEquals("gs://bucket/", path.toString());
+  }
+
+  @Test
+  public void testBucketParsing() throws Exception {
+    GcsPath path = GcsPath.fromUri("gs://bucket");
+    GcsPath path2 = GcsPath.fromUri("gs://bucket/");
+
+    assertEquals(path, path2);
+    assertEquals(path.toString(), path2.toString());
+    assertEquals(path.toUri(), path2.toUri());
+  }
+
+  @Test
+  public void testGcsPathToString() throws Exception {
+    String filename = "gs://some_bucket/some/file.txt";
+    GcsPath path = GcsPath.fromUri(filename);
+    assertEquals(filename, path.toString());
+  }
+
+  @Test
+  public void testEquals() {
+    GcsPath a = GcsPath.fromComponents(null, "a/b/c");
+    GcsPath a2 = GcsPath.fromComponents(null, "a/b/c");
+    assertFalse(a.isAbsolute());
+    assertFalse(a2.isAbsolute());
+
+    GcsPath b = GcsPath.fromComponents("bucket", "a/b/c");
+    GcsPath b2 = GcsPath.fromComponents("bucket", "a/b/c");
+    assertTrue(b.isAbsolute());
+    assertTrue(b2.isAbsolute());
+
+    assertEquals(a, a);
+    assertThat(a, Matchers.not(Matchers.equalTo(b)));
+    assertThat(b, Matchers.not(Matchers.equalTo(a)));
+
+    assertEquals(a, a2);
+    assertEquals(a2, a);
+    assertEquals(b, b2);
+    assertEquals(b2, b);
+
+    assertThat(a, Matchers.not(Matchers.equalTo(Paths.get("/tmp/foo"))));
+    assertTrue(a != null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidGcsPath() {
+    @SuppressWarnings("unused")
+    GcsPath filename =
+        GcsPath.fromUri("file://invalid/gcs/path");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidBucket() {
+    GcsPath.fromComponents("invalid/", "");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidObject_newline() {
+    GcsPath.fromComponents(null, "a\nb");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidObject_cr() {
+    GcsPath.fromComponents(null, "a\rb");
+  }
+
+  @Test
+  public void testResolveUri() {
+    GcsPath path = GcsPath.fromComponents("bucket", "a/b/c");
+    GcsPath d = path.resolve("gs://bucket2/d");
+    assertEquals("gs://bucket2/d", d.toString());
+  }
+
+  @Test
+  public void testResolveOther() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a");
+    GcsPath b = a.resolve(Paths.get("b"));
+    assertEquals("a/b", b.getObject());
+  }
+
+  @Test
+  public void testGetFileName() {
+    assertEquals("foo", GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString());
+    assertEquals("foo", GcsPath.fromUri("gs://bucket/foo").getFileName().toString());
+    thrown.expect(UnsupportedOperationException.class);
+    GcsPath.fromUri("gs://bucket/").getFileName();
+  }
+
+  @Test
+  public void testResolveSibling() {
+    assertEquals(
+        "gs://bucket/bar/moo",
+        GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString());
+    assertEquals(
+        "gs://bucket/moo",
+        GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString());
+    thrown.expect(UnsupportedOperationException.class);
+    GcsPath.fromUri("gs://bucket/").resolveSibling("moo");
+  }
+
+  @Test
+  public void testCompareTo() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a");
+    GcsPath b = GcsPath.fromComponents("bucket", "b");
+    GcsPath b2 = GcsPath.fromComponents("bucket2", "b");
+    GcsPath brel = GcsPath.fromComponents(null, "b");
+    GcsPath a2 = GcsPath.fromComponents("bucket", "a");
+    GcsPath arel = GcsPath.fromComponents(null, "a");
+
+    assertThat(a.compareTo(b), Matchers.lessThan(0));
+    assertThat(b.compareTo(a), Matchers.greaterThan(0));
+    assertThat(a.compareTo(a2), Matchers.equalTo(0));
+
+    assertThat(a.hashCode(), Matchers.equalTo(a2.hashCode()));
+    assertThat(a.hashCode(), Matchers.not(Matchers.equalTo(b.hashCode())));
+    assertThat(b.hashCode(), Matchers.not(Matchers.equalTo(brel.hashCode())));
+
+    assertThat(brel.compareTo(b), Matchers.lessThan(0));
+    assertThat(b.compareTo(brel), Matchers.greaterThan(0));
+    assertThat(arel.compareTo(brel), Matchers.lessThan(0));
+    assertThat(brel.compareTo(arel), Matchers.greaterThan(0));
+
+    assertThat(b.compareTo(b2), Matchers.lessThan(0));
+    assertThat(b2.compareTo(b), Matchers.greaterThan(0));
+  }
+
+  @Test
+  public void testCompareTo_ordering() {
+    GcsPath ab = GcsPath.fromComponents("bucket", "a/b");
+    GcsPath abc = GcsPath.fromComponents("bucket", "a/b/c");
+    GcsPath a1b = GcsPath.fromComponents("bucket", "a-1/b");
+
+    assertThat(ab.compareTo(a1b), Matchers.lessThan(0));
+    assertThat(a1b.compareTo(ab), Matchers.greaterThan(0));
+
+    assertThat(ab.compareTo(abc), Matchers.lessThan(0));
+    assertThat(abc.compareTo(ab), Matchers.greaterThan(0));
+  }
+
+  @Test
+  public void testCompareTo_buckets() {
+    GcsPath a = GcsPath.fromComponents(null, "a/b/c");
+    GcsPath b = GcsPath.fromComponents("bucket", "a/b/c");
+
+    assertThat(a.compareTo(b), Matchers.lessThan(0));
+    assertThat(b.compareTo(a), Matchers.greaterThan(0));
+  }
+
+  @Test
+  public void testIterator() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a/b/c");
+    Iterator<Path> it = a.iterator();
+
+    assertTrue(it.hasNext());
+    assertEquals("gs://bucket/", it.next().toString());
+    assertTrue(it.hasNext());
+    assertEquals("a", it.next().toString());
+    assertTrue(it.hasNext());
+    assertEquals("b", it.next().toString());
+    assertTrue(it.hasNext());
+    assertEquals("c", it.next().toString());
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void testSubpath() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d");
+    assertThat(a.subpath(0, 1).toString(), Matchers.equalTo("gs://bucket/"));
+    assertThat(a.subpath(0, 2).toString(), Matchers.equalTo("gs://bucket/a"));
+    assertThat(a.subpath(0, 3).toString(), Matchers.equalTo("gs://bucket/a/b"));
+    assertThat(a.subpath(0, 4).toString(), Matchers.equalTo("gs://bucket/a/b/c"));
+    assertThat(a.subpath(1, 2).toString(), Matchers.equalTo("a"));
+    assertThat(a.subpath(2, 3).toString(), Matchers.equalTo("b"));
+    assertThat(a.subpath(2, 4).toString(), Matchers.equalTo("b/c"));
+    assertThat(a.subpath(2, 5).toString(), Matchers.equalTo("b/c/d"));
+  }
+
+  @Test
+  public void testGetName() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d");
+    assertEquals(5, a.getNameCount());
+    assertThat(a.getName(0).toString(), Matchers.equalTo("gs://bucket/"));
+    assertThat(a.getName(1).toString(), Matchers.equalTo("a"));
+    assertThat(a.getName(2).toString(), Matchers.equalTo("b"));
+    assertThat(a.getName(3).toString(), Matchers.equalTo("c"));
+    assertThat(a.getName(4).toString(), Matchers.equalTo("d"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testSubPathError() {
+    GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d");
+    a.subpath(1, 1); // throws IllegalArgumentException
+    Assert.fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 8a48eca..4c60f20 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -32,7 +32,7 @@
   <name>Apache Beam :: SDKs :: Java :: Extensions</name>
 
   <modules>
-    <module>gcp-core</module>
+    <module>google-cloud-platform-core</module>
     <module>jackson</module>
     <module>join-library</module>
     <module>protobuf</module>


Mime
View raw message