beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Cwik (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-2448) FileBasedSink writing to incorrect path when path prefixe has no file component in path (e.g. /tmp/)
Date Wed, 14 Jun 2017 15:51:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Luke Cwik updated BEAM-2448:
----------------------------
    Description: 
This was reported by a user on dev@, original report:
https://lists.apache.org/thread.html/378da40ca7d13e226ca793d2a27af047f9a562f273f6eaa5d677dc4b@%3Cdev.beam.apache.org%3E

This pipeline (where WindowedFilenamePolicy is the one found in org.apache.beam.sdk.io.AvroIOTest)
produces files in the wrong directory:
{code}
stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
        .apply("WritingToOutput",
TextIO.write().withWindowedWrites().withFilenamePolicy(new
WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
{code}

I expected the output files to be written to /tmp/ directory but they are not. They are written
to root directory which is unexpected behavior.

I think the problem is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...
This main method shows the problem:
{code}
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;

public class Test {

  public static void main(String[] args) {
    ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
    System.out.println("Resource is " + res + ", current directory is " +
res.getCurrentDirectory() + ", filename is " + res.getFilename());
    FileBasedSink<String> mockFBS = new
FileBasedSink<String>(StaticValueProvider.of(res), null) {

      @Override
      public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
createWriteOperation() {
        return null;
      }
    };

    final ValueProvider<ResourceId> provider =
mockFBS.getBaseOutputDirectoryProvider();

    System.out.println("BaseOutputProvider is " + provider + ",
isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
  }

}
{code}

The output is
{code}
Resource is /tmp, current directory is //, filename is tmp
BaseOutputProvider is
NestedValueProvider{value=StaticValueProvider{value=/tmp}},
isAccessible=true, getValue=//
{code}

  was:
This was reported by a user on dev@, original report:


This pipeline (where WindowedFilenamePolicy is the one found in org.apache.beam.sdk.io.AvroIOTest)
produces files in the wrong directory:
{code}
stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
        .apply("WritingToOutput",
TextIO.write().withWindowedWrites().withFilenamePolicy(new
WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
{code}

I expected the output files to be written to /tmp/ directory but they are not. They are written
to root directory which is unexpected behavior.

I think the problem is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...
This main method shows the problem:
{code}
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;

public class Test {

  public static void main(String[] args) {
    ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
    System.out.println("Resource is " + res + ", current directory is " +
res.getCurrentDirectory() + ", filename is " + res.getFilename());
    FileBasedSink<String> mockFBS = new
FileBasedSink<String>(StaticValueProvider.of(res), null) {

      @Override
      public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
createWriteOperation() {
        return null;
      }
    };

    final ValueProvider<ResourceId> provider =
mockFBS.getBaseOutputDirectoryProvider();

    System.out.println("BaseOutputProvider is " + provider + ",
isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
  }

}
{code}

The output is
{code}
Resource is /tmp, current directory is //, filename is tmp
BaseOutputProvider is
NestedValueProvider{value=StaticValueProvider{value=/tmp}},
isAccessible=true, getValue=//
{code}


> FileBasedSink writing to incorrect path when path prefixe has no file component in path
(e.g. /tmp/)
> ----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-2448
>                 URL: https://issues.apache.org/jira/browse/BEAM-2448
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Luke Cwik
>            Priority: Minor
>
> This was reported by a user on dev@, original report:
> https://lists.apache.org/thread.html/378da40ca7d13e226ca793d2a27af047f9a562f273f6eaa5d677dc4b@%3Cdev.beam.apache.org%3E
> This pipeline (where WindowedFilenamePolicy is the one found in org.apache.beam.sdk.io.AvroIOTest)
produces files in the wrong directory:
> {code}
> stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
>         .apply("WritingToOutput",
> TextIO.write().withWindowedWrites().withFilenamePolicy(new
> WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
> {code}
> I expected the output files to be written to /tmp/ directory but they are not. They are
written to root directory which is unexpected behavior.
> I think the problem is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...
> This main method shows the problem:
> {code}
> import org.apache.beam.sdk.io.FileBasedSink;
> import org.apache.beam.sdk.io.fs.ResourceId;
> import org.apache.beam.sdk.options.ValueProvider;
> import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
> public class Test {
>   public static void main(String[] args) {
>     ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
>     System.out.println("Resource is " + res + ", current directory is " +
> res.getCurrentDirectory() + ", filename is " + res.getFilename());
>     FileBasedSink<String> mockFBS = new
> FileBasedSink<String>(StaticValueProvider.of(res), null) {
>       @Override
>       public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
> createWriteOperation() {
>         return null;
>       }
>     };
>     final ValueProvider<ResourceId> provider =
> mockFBS.getBaseOutputDirectoryProvider();
>     System.out.println("BaseOutputProvider is " + provider + ",
> isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
>   }
> }
> {code}
> The output is
> {code}
> Resource is /tmp, current directory is //, filename is tmp
> BaseOutputProvider is
> NestedValueProvider{value=StaticValueProvider{value=/tmp}},
> isAccessible=true, getValue=//
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message