beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: [BEAM-2753] Fixes translation of WriteFiles side inputs
Date Wed, 30 Aug 2017 23:29:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1cd87e325 -> 097aec7a3


[BEAM-2753] Fixes translation of WriteFiles side inputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/783f26f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/783f26f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/783f26f3

Branch: refs/heads/master
Commit: 783f26f3a80a3f2a9d5a0fafc33778e046fe6b36
Parents: 1cd87e3
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Aug 25 14:49:07 2017 -0700
Committer: Eugene Kirpichov <ekirpichov@gmail.com>
Committed: Wed Aug 30 16:29:05 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PipelineTranslation.java  |  55 ++++++----
 .../direct/WriteWithShardingFactory.java        |  13 +--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 106 +++++++++++++------
 3 files changed, 112 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index d928338..8a2faf3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -152,30 +152,24 @@ public class PipelineTranslation {
     RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
 
     // By default, no "additional" inputs, since that is an SDK-specific thing.
-    // Only ParDo really separates main from side inputs
+    // Only ParDo and WriteFiles really separate main from side inputs
     Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
 
-    // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    // TODO: ParDoTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
     if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
-      RunnerApi.ParDoPayload payload =
-          RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
-
-      List<PCollectionView<?>> views = new ArrayList<>();
-      for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
-          payload.getSideInputsMap().entrySet()) {
-        String localName = sideInputEntry.getKey();
-        RunnerApi.SideInput sideInput = sideInputEntry.getValue();
-        PCollection<?> pCollection =
-            (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
-        views.add(
-            ParDoTranslation.viewFromProto(
-                sideInputEntry.getValue(),
-                sideInputEntry.getKey(),
-                pCollection,
-                transformProto,
-                rehydratedComponents));
-      }
-      additionalInputs = PCollectionViews.toAdditionalInputs(views);
+      RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
+      additionalInputs =
+          sideInputMapToAdditionalInputs(
+              transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
+    }
+
+    // TODO: WriteFilesTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    if (transformSpec.getUrn().equals(PTransformTranslation.WRITE_FILES_TRANSFORM_URN)) {
+      RunnerApi.WriteFilesPayload payload =
+          RunnerApi.WriteFilesPayload.parseFrom(transformSpec.getPayload());
+      additionalInputs =
+          sideInputMapToAdditionalInputs(
+              transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
     }
 
     // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
@@ -216,6 +210,25 @@ public class PipelineTranslation {
     }
   }
 
+  private static Map<TupleTag<?>, PValue> sideInputMapToAdditionalInputs(
+      RunnerApi.PTransform transformProto,
+      RehydratedComponents rehydratedComponents,
+      Map<TupleTag<?>, PValue> rehydratedInputs,
+      Map<String, RunnerApi.SideInput> sideInputsMap)
+      throws IOException {
+    List<PCollectionView<?>> views = new ArrayList<>();
+    for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : sideInputsMap.entrySet())
{
+      String localName = sideInputEntry.getKey();
+      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+      PCollection<?> pCollection =
+          (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
+      views.add(
+          ParDoTranslation.viewFromProto(
+              sideInput, localName, pCollection, transformProto, rehydratedComponents));
+    }
+    return PCollectionViews.toAdditionalInputs(views);
+  }
+
   // A primitive transform is one with outputs that are not in its input and also
   // not produced by a subtransform.
   private static boolean isPrimitive(RunnerApi.PTransform transformProto) {

http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 3557c5d..605ef64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,12 +24,10 @@ import com.google.common.base.Suppliers;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.WriteFilesTranslation;
-import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -63,16 +61,15 @@ class WriteWithShardingFactory<InputT>
       AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>,
PDone>>
           transform) {
     try {
-      List<PCollectionView<?>> sideInputs =
-          WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
-      FileBasedSink sink = WriteFilesTranslation.getSink(transform);
-      WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
+      WriteFiles<InputT, ?, ?> replacement =
+          WriteFiles.to(WriteFilesTranslation.getSink(transform))
+              .withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform))
+              .withSharding(new LogElementShardsWithDrift<InputT>());
       if (WriteFilesTranslation.isWindowedWrites(transform)) {
         replacement = replacement.withWindowedWrites();
       }
       return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          replacement.withSharding(new LogElementShardsWithDrift<InputT>()));
+          PTransformReplacements.getSingletonMainInput(transform), replacement);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 8870dd8..58af1d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
@@ -28,13 +29,14 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -63,7 +65,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -525,7 +526,7 @@ public class AvroIOTest {
               outputFileHints.getSuggestedFilenameSuffix());
       return outputFilePrefix
           .getCurrentDirectory()
-          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+          .resolve(filename, RESOLVE_FILE);
     }
 
     @Override
@@ -709,16 +710,20 @@ public class AvroIOTest {
     public FilenamePolicy getFilenamePolicy(String destination) {
       return DefaultFilenamePolicy.fromStandardParameters(
           StaticValueProvider.of(
-              baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)),
+              baseDir.resolve("file_" + destination + ".txt", RESOLVE_FILE)),
           null,
           null,
           false);
     }
   }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void testDynamicDestinations() throws Exception {
+  private enum Sharding {
+    RUNNER_DETERMINED,
+    WITHOUT_SHARDING,
+    FIXED_3_SHARDS
+  }
+
+  private void testDynamicDestinationsWithSharding(Sharding sharding) throws Exception {
     ResourceId baseDir =
         FileSystems.matchNewResource(
             Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations")
@@ -726,13 +731,14 @@ public class AvroIOTest {
             true);
 
     List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa",
"caab");
-    List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size());
+    Multimap<String, GenericRecord> expectedElements = ArrayListMultimap.create();
     Map<String, String> schemaMap = Maps.newHashMap();
     for (String element : elements) {
       String prefix = element.substring(0, 1);
       String jsonSchema = schemaFromPrefix(prefix);
       schemaMap.put(prefix, jsonSchema);
-      expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
+      expectedElements.put(
+          prefix, createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
     }
     PCollectionView<Map<String, String>> schemaView =
         writePipeline
@@ -741,38 +747,72 @@ public class AvroIOTest {
 
     PCollection<String> input =
         writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
-    input.apply(
+    AvroIO.TypedWrite<String, GenericRecord> write =
         AvroIO.<String>writeCustomTypeToGenericRecords()
             .to(new TestDynamicDestinations(baseDir, schemaView))
-            .withoutSharding()
-            .withTempDirectory(baseDir));
+            .withTempDirectory(baseDir);
+
+    switch (sharding) {
+      case RUNNER_DETERMINED:
+        break;
+      case WITHOUT_SHARDING:
+        write = write.withoutSharding();
+        break;
+      case FIXED_3_SHARDS:
+        write = write.withNumShards(3);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown sharding " + sharding);
+    }
+
+    input.apply(write);
     writePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order.
 
-    List<String> prefixes = Lists.newArrayList();
-    for (String element : elements) {
-      prefixes.add(element.substring(0, 1));
-    }
-    prefixes = ImmutableSet.copyOf(prefixes).asList();
-
-    List<GenericRecord> actualElements = new ArrayList<>();
-    for (String prefix : prefixes) {
-      File expectedFile =
-          new File(
-              baseDir
-                  .resolve(
-                      "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
-                  .toString());
-      assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists());
-      Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix));
-      try (DataFileReader<GenericRecord> reader =
-          new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema)))
{
-        Iterators.addAll(actualElements, reader);
+    for (String prefix : expectedElements.keySet()) {
+      String shardPattern;
+      switch (sharding) {
+        case RUNNER_DETERMINED:
+          shardPattern = "*";
+          break;
+        case WITHOUT_SHARDING:
+          shardPattern = "00000-of-00001";
+          break;
+        case FIXED_3_SHARDS:
+          shardPattern = "*-of-00003";
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown sharding " + sharding);
       }
-      expectedFile.delete();
+      String expectedFilepattern =
+          baseDir.resolve("file_" + prefix + ".txt-" + shardPattern, RESOLVE_FILE).toString();
+
+      PCollection<GenericRecord> records =
+          readPipeline.apply(
+              "read_" + prefix,
+              AvroIO.readGenericRecords(schemaFromPrefix(prefix)).from(expectedFilepattern));
+      PAssert.that(records).containsInAnyOrder(expectedElements.get(prefix));
     }
-    assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsRunnerDeterminedSharding() throws Exception {
+    testDynamicDestinationsWithSharding(Sharding.RUNNER_DETERMINED);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsWithoutSharding() throws Exception {
+    testDynamicDestinationsWithSharding(Sharding.WITHOUT_SHARDING);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsWithNumShards() throws Exception {
+    testDynamicDestinationsWithSharding(Sharding.FIXED_3_SHARDS);
   }
 
   @Test


Mime
View raw message