beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] beam git commit: Splits large TextIOTest into TextIOReadTest and TextIOWriteTest
Date Tue, 18 Jul 2017 03:08:36 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0f06eb25b -> 7c3631810


http://git-wip-us.apache.org/repos/asf/beam/blob/d495d151/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
new file mode 100644
index 0000000..a73ed7d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
+import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
+import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link TextIO.Write}. */
+public class TextIOWriteTest {
+  private static final String MY_HEADER = "myHeader";
+  private static final String MY_FOOTER = "myFooter";
+
+  private static Path tempFolder;
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    tempFolder = Files.createTempDirectory("TextIOTest");
+  }
+
+  @AfterClass
+  public static void teardownClass() throws IOException {
+    Files.walkFileTree(
+        tempFolder,
+        new SimpleFileVisitor<Path>() {
+          @Override
+          public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+              throws IOException {
+            Files.delete(file);
+            return FileVisitResult.CONTINUE;
+          }
+
+          @Override
+          public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException
{
+            Files.delete(dir);
+            return FileVisitResult.CONTINUE;
+          }
+        });
+  }
+
+  static class TestDynamicDestinations extends FileBasedSink.DynamicDestinations<String,
String> {
+    ResourceId baseDir;
+
+    TestDynamicDestinations(ResourceId baseDir) {
+      this.baseDir = baseDir;
+    }
+
+    @Override
+    public String getDestination(String element) {
+      // Destination is based on first character of string.
+      return element.substring(0, 1);
+    }
+
+    @Override
+    public String getDefaultDestination() {
+      return "";
+    }
+
+    @Nullable
+    @Override
+    public Coder<String> getDestinationCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    @Override
+    public FileBasedSink.FilenamePolicy getFilenamePolicy(String destination) {
+      return DefaultFilenamePolicy.fromStandardParameters(
+          ValueProvider.StaticValueProvider.of(
+              baseDir.resolve(
+                  "file_" + destination + ".txt",
+                  ResolveOptions.StandardResolveOptions.RESOLVE_FILE)),
+          null,
+          null,
+          false);
+    }
+  }
+
+  class StartsWith implements Predicate<String> {
+    String prefix;
+
+    StartsWith(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public boolean apply(@Nullable String input) {
+      return input.startsWith(prefix);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(),
true);
+
+    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa",
"caab");
+    PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
+    input.apply(
+        TextIO.write()
+            .to(new TestDynamicDestinations(baseDir))
+            .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    p.run();
+
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
+        null,
+        null,
+        0,
+        baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+  }
+
+  @DefaultCoder(AvroCoder.class)
+  private static class UserWriteType {
+    String destination;
+    String metadata;
+
+    UserWriteType() {
+      this.destination = "";
+      this.metadata = "";
+    }
+
+    UserWriteType(String destination, String metadata) {
+      this.destination = destination;
+      this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("destination: %s metadata : %s", destination, metadata);
+    }
+  }
+
+  private static class SerializeUserWrite implements SerializableFunction<UserWriteType,
String> {
+    @Override
+    public String apply(UserWriteType input) {
+      return input.toString();
+    }
+  }
+
+  private static class UserWriteDestination
+      implements SerializableFunction<UserWriteType, DefaultFilenamePolicy.Params>
{
+    private ResourceId baseDir;
+
+    UserWriteDestination(ResourceId baseDir) {
+      this.baseDir = baseDir;
+    }
+
+    @Override
+    public DefaultFilenamePolicy.Params apply(UserWriteType input) {
+      return new DefaultFilenamePolicy.Params()
+          .withBaseFilename(
+              baseDir.resolve(
+                  "file_" + input.destination.substring(0, 1) + ".txt",
+                  ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
+    }
+  }
+
+  private static class ExtractWriteDestination implements Function<UserWriteType, String>
{
+    @Override
+    public String apply(@Nullable UserWriteType input) {
+      return input.destination;
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDefaultFilenamePolicy() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(),
true);
+
+    List<UserWriteType> elements =
+        Lists.newArrayList(
+            new UserWriteType("aaaa", "first"),
+            new UserWriteType("aaab", "second"),
+            new UserWriteType("baaa", "third"),
+            new UserWriteType("baab", "fourth"),
+            new UserWriteType("caaa", "fifth"),
+            new UserWriteType("caab", "sixth"));
+    PCollection<UserWriteType> input = p.apply(Create.of(elements));
+    input.apply(
+        TextIO.writeCustomType(new SerializeUserWrite())
+            .to(new UserWriteDestination(baseDir), new DefaultFilenamePolicy.Params())
+            .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    p.run();
+
+    String[] aElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    String[] bElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    String[] cElements =
+        Iterables.toArray(
+            Iterables.transform(
+                Iterables.filter(
+                    elements,
+                    Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())),
+                Functions.toStringFunction()),
+            String.class);
+    assertOutputFiles(
+        aElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        bElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+    assertOutputFiles(
+        cElements,
+        null,
+        null,
+        0,
+        baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+  }
+
+  private void runTestWrite(String[] elems) throws Exception {
+    runTestWrite(elems, null, null, 1);
+  }
+
+  private void runTestWrite(String[] elems, int numShards) throws Exception {
+    runTestWrite(elems, null, null, numShards);
+  }
+
+  private void runTestWrite(String[] elems, String header, String footer) throws Exception
{
+    runTestWrite(elems, header, footer, 1);
+  }
+
+  private void runTestWrite(String[] elems, String header, String footer, int numShards)
+      throws Exception {
+    String outputName = "file.txt";
+    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+    ResourceId baseFilename =
+        FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
+
+    PCollection<String> input =
+        p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
+
+    TextIO.Write write = TextIO.write().to(baseFilename).withHeader(header).withFooter(footer);
+
+    if (numShards == 1) {
+      write = write.withoutSharding();
+    } else if (numShards > 0) {
+      write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
+    }
+
+    input.apply(write);
+
+    p.run();
+
+    assertOutputFiles(
+        elems,
+        header,
+        footer,
+        numShards,
+        baseFilename,
+        firstNonNull(
+            write.inner.getShardTemplate(),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
+  }
+
+  private static void assertOutputFiles(
+      String[] elems,
+      final String header,
+      final String footer,
+      int numShards,
+      ResourceId outputPrefix,
+      String shardNameTemplate)
+      throws Exception {
+    List<File> expectedFiles = new ArrayList<>();
+    if (numShards == 0) {
+      String pattern = outputPrefix.toString() + "*";
+      List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
+      for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
+        expectedFiles.add(new File(expectedFile.resourceId().toString()));
+      }
+    } else {
+      for (int i = 0; i < numShards; i++) {
+        expectedFiles.add(
+            new File(
+                DefaultFilenamePolicy.constructName(
+                    outputPrefix, shardNameTemplate, "", i, numShards, null, null)
+                    .toString()));
+      }
+    }
+
+    List<List<String>> actual = new ArrayList<>();
+
+    for (File tmpFile : expectedFiles) {
+      try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
+        List<String> currentFile = new ArrayList<>();
+        while (true) {
+          String line = reader.readLine();
+          if (line == null) {
+            break;
+          }
+          currentFile.add(line);
+        }
+        actual.add(currentFile);
+      }
+    }
+
+    List<String> expectedElements = new ArrayList<>(elems.length);
+    for (String elem : elems) {
+      byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
+      String line = new String(encodedElem);
+      expectedElements.add(line);
+    }
+
+    List<String> actualElements =
+        Lists.newArrayList(
+            Iterables.concat(
+                FluentIterable.from(actual)
+                    .transform(removeHeaderAndFooter(header, footer))
+                    .toList()));
+
+    assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+
+    assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer)));
+  }
+
+  private static Function<List<String>, List<String>> removeHeaderAndFooter(
+      final String header, final String footer) {
+    return new Function<List<String>, List<String>>() {
+      @Nullable
+      @Override
+      public List<String> apply(List<String> lines) {
+        ArrayList<String> newLines = Lists.newArrayList(lines);
+        if (header != null) {
+          newLines.remove(0);
+        }
+        if (footer != null) {
+          int last = newLines.size() - 1;
+          newLines.remove(last);
+        }
+        return newLines;
+      }
+    };
+  }
+
+  private static Predicate<List<String>> haveProperHeaderAndFooter(
+      final String header, final String footer) {
+    return new Predicate<List<String>>() {
+      @Override
+      public boolean apply(List<String> fileLines) {
+        int last = fileLines.size() - 1;
+        return (header == null || fileLines.get(0).equals(header))
+            && (footer == null || fileLines.get(last).equals(footer));
+      }
+    };
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteStrings() throws Exception {
+    runTestWrite(LINES_ARRAY);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteEmptyStringsNoSharding() throws Exception {
+    runTestWrite(NO_LINES_ARRAY, 0);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteEmptyStrings() throws Exception {
+    runTestWrite(NO_LINES_ARRAY);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testShardedWrite() throws Exception {
+    runTestWrite(LINES_ARRAY, 5);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithHeader() throws Exception {
+    runTestWrite(LINES_ARRAY, MY_HEADER, null);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithFooter() throws Exception {
+    runTestWrite(LINES_ARRAY, null, MY_FOOTER);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithHeaderAndFooter() throws Exception {
+    runTestWrite(LINES_ARRAY, MY_HEADER, MY_FOOTER);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithWritableByteChannelFactory() throws Exception {
+    Coder<String> coder = StringUtf8Coder.of();
+    String outputName = "file.txt";
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tempFolder, "testwrite").toString(), true);
+
+    PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
+
+    final WritableByteChannelFactory writableByteChannelFactory =
+        new DrunkWritableByteChannelFactory();
+    TextIO.Write write =
+        TextIO.write()
+            .to(
+                baseDir
+                    .resolve(outputName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+                    .toString())
+            .withoutSharding()
+            .withWritableByteChannelFactory(writableByteChannelFactory);
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
+
+    input.apply(write);
+
+    p.run();
+
+    final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 +
2);
+    for (String elem : LINES2_ARRAY) {
+      drunkElems.add(elem);
+      drunkElems.add(elem);
+    }
+    assertOutputFiles(
+        drunkElems.toArray(new String[0]),
+        null,
+        null,
+        1,
+        baseDir.resolve(
+            outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
+            ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
+        write.inner.getShardTemplate());
+  }
+
+  @Test
+  public void testWriteDisplayData() {
+    TextIO.Write write =
+        TextIO.write()
+            .to("/foo")
+            .withSuffix("bar")
+            .withShardNameTemplate("-SS-of-NN-")
+            .withNumShards(100)
+            .withFooter("myFooter")
+            .withHeader("myHeader");
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
+    assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+    assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
+    assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
+    assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
+    assertThat(displayData, hasDisplayItem("numShards", 100));
+    assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
+  }
+
+  @Test
+  public void testWriteDisplayDataValidateThenHeader() {
+    TextIO.Write write = TextIO.write().to("foo").withHeader("myHeader");
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
+  }
+
+  @Test
+  public void testWriteDisplayDataValidateThenFooter() {
+    TextIO.Write write = TextIO.write().to("foo").withFooter("myFooter");
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
+  }
+
+  @Test
+  public void testGetName() {
+    assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
+  }
+
+  /** Options for testing. */
+  public interface RuntimeTestOptions extends PipelineOptions {
+    ValueProvider<String> getOutput();
+    void setOutput(ValueProvider<String> value);
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApply() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+
+    p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput()));
+  }
+}


Mime
View raw message