apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2008) Create hdfs file input module
Date Fri, 11 Mar 2016 01:27:30 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15190284#comment-15190284
] 

ASF GitHub Bot commented on APEXMALHAR-2008:
--------------------------------------------

Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55778161
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +
    +/**
    + * HDFSFileSplitter extends {@link FileSplitterInput} to,
    + * 1. Add relative path to file metadata.
    + * 2. Ignore HDFS temp files (files with extensions _COPYING_).
    + * 3. Set sequencial read option on readers.
    + */
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items
to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  /**
    +   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
    +   * and files containing unsupported characters. 
    +   */
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    --- End diff --
    
    One way to handle this could be to expose ignoreRegex as a property for module, with default
being _COPYING_. Could be useful ignore other file extensions as well. It can be also be part
of superclass if it applies to other type of file readers.


> Create hdfs file input module 
> ------------------------------
>
>                 Key: APEXMALHAR-2008
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2008
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>            Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> To read HDFS files in parallel using Apex we normally use FileSplitter and FileReader
module. It would be a good idea to combine those operators as a unit in module. Having a module
will give us readily usable set of operators to read HDFS files. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message