beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [06/50] beam git commit: [BEAM-2276] Cleanups on the windowed DefaultFilenamePolicy
Date Thu, 08 Jun 2017 01:34:55 GMT
[BEAM-2276] Cleanups on the windowed DefaultFilenamePolicy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e764167f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e764167f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e764167f

Branch: refs/heads/DSL_SQL
Commit: e764167f40e603ac00ac80758cac0108bcc49769
Parents: 6d64c6e
Author: Reuven Lax <relax@google.com>
Authored: Thu May 25 23:42:17 2017 -0700
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue Jun 6 11:08:35 2017 +0200

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    |   8 +-
 .../direct/WriteWithShardingFactoryTest.java    |  23 +++--
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 103 +++++--------------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  11 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  26 ++---
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   5 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |   2 +-
 10 files changed, 69 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cfea62f..2497598 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -504,8 +504,12 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void writeWithRunnerDeterminedSharding() {
     ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory
*/);
-    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
-        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE,
"");
+    FilenamePolicy policy =
+        DefaultFilenamePolicy.constructUsingStandardParameters(
+            StaticValueProvider.of(outputDirectory),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+            "",
+            false);
     WriteFiles<Integer> write =
         WriteFiles.to(
             new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy)
{

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 5c4fea1..a88d95e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -129,15 +129,20 @@ public class WriteWithShardingFactoryTest {
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
     ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
-    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
-        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE,
"");
-    WriteFiles<Object> original = WriteFiles.to(
-        new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy)
{
-          @Override
-          public WriteOperation<Object> createWriteOperation() {
-            throw new IllegalArgumentException("Should not be used");
-          }
-        });
+    FilenamePolicy policy =
+        DefaultFilenamePolicy.constructUsingStandardParameters(
+            StaticValueProvider.of(outputDirectory),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+            "",
+            false);
+    WriteFiles<Object> original =
+        WriteFiles.to(
+            new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy)
{
+              @Override
+              public WriteOperation<Object> createWriteOperation() {
+                throw new IllegalArgumentException("Should not be used");
+              }
+            });
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6af0e79..4143db2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -451,7 +451,7 @@ public class AvroIO {
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {
         usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
-            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
       }
 
       WriteFiles<T> write = WriteFiles.to(

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 5073854..f9e4ac4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);
 
   /** The default sharding name template used in {@link #constructUsingStandardParameters}.
*/
-  public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+  public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
   /** The default windowed sharding name template used when writing windowed files.
    *  This is used as default in cases when user did not specify shard template to
@@ -63,27 +64,12 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    *  windowed and non-windowed file names.
    */
   private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
-      "P-W" + DEFAULT_SHARD_TEMPLATE;
-
-  /*
-   * pattern for only non-windowed file names
-   */
-  private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+";
-
-  /*
-   * pattern for only windowed file names
-   */
-  private static final String WINDOWED_ONLY_PATTERN = "P|W";
+      "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
 
   /*
    * pattern for both windowed and non-windowed file names
    */
-  private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|"
-   + WINDOWED_ONLY_PATTERN + ")";
-
-  // Pattern that matches shard placeholders within a shard template.
-  private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN);
-  private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN);
+  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");
 
   /**
    * Constructs a new {@link DefaultFilenamePolicy}.
@@ -104,19 +90,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    *
    * <p>Any filename component of the provided resource will be used as the filename
prefix.
    *
-   * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
-   * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}
will
-   * be used for windowed file names.
+   * <p>If provided, the shard name template will be used; otherwise
+   * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names
and
+   * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names.
    *
    * <p>If provided, the suffix will be used; otherwise the files will have an empty
suffix.
    */
   public static DefaultFilenamePolicy constructUsingStandardParameters(
       ValueProvider<ResourceId> outputPrefix,
       @Nullable String shardTemplate,
-      @Nullable String filenameSuffix) {
+      @Nullable String filenameSuffix,
+      boolean windowedWrites) {
+    // Pick the appropriate default policy based on whether windowed writes are being performed.
+    String defaultTemplate =
+        windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
     return new DefaultFilenamePolicy(
         NestedValueProvider.of(outputPrefix, new ExtractFilename()),
-        firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE),
+        firstNonNull(shardTemplate, defaultTemplate),
         firstNonNull(filenameSuffix, ""));
   }
 
@@ -124,19 +114,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
   private final String shardTemplate;
   private final String suffix;
 
-  /*
-   * Checks whether given template contains enough information to form
-   * meaningful windowed file names - ie whether it uses pane and window
-   * info.
-   */
-  static boolean isWindowedTemplate(String template){
-    if (template != null){
-      Matcher m = WINDOWED_FORMAT_RE.matcher(template);
-      return m.find();
-    }
-    return false;
-  }
-
   /**
    * Constructs a fully qualified name from components.
    *
@@ -191,51 +168,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
     return sb.toString();
   }
 
-  static String constructName(String prefix, String shardTemplate, String suffix, int shardNum,
-      int numShards) {
-    return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null);
-  }
-
   @Override
   @Nullable
   public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
       String extension) {
-    String filename =
-        constructName(
-            prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards())
-        + extension;
+    String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
+        context.getNumShards(), null, null) + extension;
     return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
   }
 
   @Override
   public ResourceId windowedFilename(ResourceId outputDirectory,
       WindowedContext context, String extension) {
-
-    boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE);
-
-    if (shardTemplateProvidedByUser){
-      boolean isWindowed = isWindowedTemplate(this.shardTemplate);
-      if (!isWindowed){
-        LOG.info("Template you provided {} does not have enough information to create"
-            + "meaningful windowed file names. Consider using P and W in your template",
-            this.shardTemplate);
-      }
-    }
-
     final PaneInfo paneInfo = context.getPaneInfo();
     String paneStr = paneInfoToString(paneInfo);
     String windowStr = windowToString(context.getWindow());
-
-    String templateToUse = shardTemplate;
-    if (!shardTemplateProvidedByUser){
-      LOG.info("User did not provide shard template. For creating windowed file names "
-          + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE);
-      templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE;
-    }
-
-    String filename = constructName(prefix.get(), templateToUse, suffix,
-        context.getShardNumber(), context.getNumShards(), paneStr, windowStr)
-        + extension;
+    String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
+        context.getNumShards(), paneStr, windowStr) + extension;
     return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
   }
 
@@ -248,18 +197,20 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
     }
     if (window instanceof IntervalWindow) {
       IntervalWindow iw = (IntervalWindow) window;
-      return String.format("IntervalWindow-%s-%s", iw.start().toString(),
-          iw.end().toString());
+      return String.format("%s-%s", iw.start().toString(), iw.end().toString());
     }
     return window.toString();
   }
 
-  private String paneInfoToString(PaneInfo paneInfo){
-    long currentPaneIndex = (paneInfo == null ? -1L
-        : paneInfo.getIndex());
-    boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst());
-    boolean lastPane = (paneInfo == null ? false : paneInfo.isLast());
-    return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane);
+  private String paneInfoToString(PaneInfo paneInfo) {
+    String paneString = String.format("pane-%d", paneInfo.getIndex());
+    if (paneInfo.getTiming() == Timing.LATE) {
+      paneString = String.format("%s-late", paneString);
+    }
+    if (paneInfo.isLast()) {
+      paneString = String.format("%s-last", paneString);
+    }
+    return paneString;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index c274595..e288075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -559,7 +559,7 @@ public class TFRecordIO {
       super(
           outputPrefix,
           DefaultFilenamePolicy.constructUsingStandardParameters(
-              outputPrefix, shardTemplate, suffix),
+              outputPrefix, shardTemplate, suffix, false),
           writableByteChannelFactory(compressionType));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index afb5849..f1eb7c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -440,7 +440,7 @@ public class TextIO {
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {
         usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
-            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
       }
       WriteFiles<String> write =
           WriteFiles.to(

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index d71f2f7..6d01d32 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -479,7 +479,8 @@ public class AvroIOTest {
     p.run();
 
     String shardNameTemplate =
-        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
+        firstNonNull(write.getShardTemplate(),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -493,7 +494,13 @@ public class AvroIOTest {
       expectedFiles.add(
           new File(
               DefaultFilenamePolicy.constructName(
-                  outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
+                  outputFilePrefix,
+                  shardNameTemplate,
+                  "" /* no suffix */,
+                  i,
+                  numShards,
+                  null,
+                  null)));
     }
 
     List<String> actualElements = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index 787403b..217420c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -18,10 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
-import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,36 +33,25 @@ public class DefaultFilenamePolicyTest {
   @Test
   public void testConstructName() {
     assertEquals("output-001-of-123.txt",
-        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
 
     assertEquals("out.txt/part-00042",
-        constructName("out.txt", "/part-SSSSS", "", 42, 100));
+        constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));
 
     assertEquals("out.txt",
-        constructName("ou", "t.t", "xt", 1, 1));
+        constructName("ou", "t.t", "xt", 1, 1, null, null));
 
     assertEquals("out0102shard.txt",
-        constructName("out", "SSNNshard", ".txt", 1, 2));
+        constructName("out", "SSNNshard", ".txt", 1, 2, null, null));
 
     assertEquals("out-2/1.part-1-of-2.txt",
-        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
   }
 
   @Test
   public void testConstructNameWithLargeShardCount() {
     assertEquals("out-100-of-5000.txt",
-        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
-  }
-
-  @Test
-  public void testIsWindowedTemplate(){
-    assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W"));
-    assertTrue(isWindowedTemplate("-SSS-of-NNN-W"));
-    assertTrue(isWindowedTemplate("-SSS-of-NNN-P"));
-    assertTrue(isWindowedTemplate("W-SSS-of-NNN"));
-
-    assertFalse(isWindowedTemplate("-SSS-of-NNN"));
-    assertFalse(isWindowedTemplate("-SSS-of-lp"));
+        constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 6c7a53f..9468893 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -312,7 +312,8 @@ public class TextIOTest {
     p.run();
 
     assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
+        firstNonNull(write.getShardTemplate(),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
   }
 
   public static void assertOutputFiles(
@@ -337,7 +338,7 @@ public class TextIOTest {
             new File(
                 rootLocation.toString(),
                 DefaultFilenamePolicy.constructName(
-                    outputName, shardNameTemplate, "", i, numShards)));
+                    outputName, shardNameTemplate, "", i, numShards, null, null)));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 60075a7..6ae83f2 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -41,7 +41,7 @@ class XmlSink<T> extends FileBasedSink<T> {
 
   private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
     return DefaultFilenamePolicy.constructUsingStandardParameters(
-        spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
+        spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
   }
 
   XmlSink(XmlIO.Write<T> spec) {


Mime
View raw message