apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2008) Create hdfs file input module
Date Thu, 10 Mar 2016 09:05:40 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15188933#comment-15188933
] 

ASF GitHub Bot commented on APEXMALHAR-2008:
--------------------------------------------

Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/207#discussion_r55650442
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
    +import com.datatorrent.lib.io.block.HDFSBlockMetadata;
    +
    +public class HDFSFileSplitter extends FileSplitterInput
    +{
    +  private boolean sequencialFileRead;
    +
    +  public HDFSFileSplitter()
    +  {
    +    super();
    +    super.setScanner(new HDFSScanner());
    +  }
    +
    +  @Override
    +  protected FileMetadata createFileMetadata(FileInfo fileInfo)
    +  {
    +    return new HDFSFileMetaData(fileInfo.getFilePath());
    +  }
    +
    +  @Override
    +  protected HDFSFileMetaData buildFileMetadata(FileInfo fileInfo) throws IOException
    +  {
    +    FileMetadata metadata = super.buildFileMetadata(fileInfo);
    +    HDFSFileMetaData hdfsFileMetaData = (HDFSFileMetaData)metadata;
    +
    +    Path path = new Path(fileInfo.getFilePath());
    +    FileStatus status = getFileStatus(path);
    +    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
    +      hdfsFileMetaData.setRelativePath(status.getPath().getName());
    +    } else {
    +      String relativePath = getRelativePathWithFolderName(fileInfo);
    +      hdfsFileMetaData.setRelativePath(relativePath);
    +    }
    +    return hdfsFileMetaData;
    +  }
    +
    +  /*
    +   * As folder name was given to input for copy, prefix folder name to the sub items
to copy.
    +   */
    +  private String getRelativePathWithFolderName(FileInfo fileInfo)
    +  {
    +    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
    +    return parentDir + File.separator + fileInfo.getRelativeFilePath();
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
    +  {
    +    HDFSBlockMetadata blockMetadta = new HDFSBlockMetadata(fileMetadata.getFilePath());
    +    blockMetadta.setReadBlockInSequence(sequencialFileRead);
    +    return blockMetadta;
    +  }
    +
    +  @Override
    +  protected HDFSBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock,
int blockNumber, FileMetadata fileMetadata, boolean isLast)
    +  {
    +    FileBlockMetadata metadata = super.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber,
fileMetadata, isLast);
    +    HDFSBlockMetadata blockMetadata = (HDFSBlockMetadata)metadata;
    +    return blockMetadata;
    +  }
    +
    +  public boolean isSequencialFileRead()
    +  {
    +    return sequencialFileRead;
    +  }
    +
    +  public void setSequencialFileRead(boolean sequencialFileRead)
    +  {
    +    this.sequencialFileRead = sequencialFileRead;
    +  }
    +
    +  public static class HDFSScanner extends TimeBasedDirectoryScanner
    +  {
    +    protected static final String HDFS_TEMP_FILE = ".*._COPYING_";
    +    protected static final String UNSUPPORTED_CHARACTOR = ":";
    +    private transient Pattern ignoreRegex;
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      ignoreRegex = Pattern.compile(HDFS_TEMP_FILE);
    +    }
    +
    +    @Override
    +    protected boolean acceptFile(String filePathStr)
    +    {
    +      boolean accepted = super.acceptFile(filePathStr);
    +      if (containsUnsupportedCharacters(filePathStr) || isTempFile(filePathStr)) {
    +        return false;
    +      }
    +      return accepted;
    +    }
    +
    +    private boolean isTempFile(String filePathStr)
    +    {
    +      String fileName = new Path(filePathStr).getName();
    +      if (ignoreRegex != null) {
    +        Matcher matcher = ignoreRegex.matcher(fileName);
    +        if (matcher.matches()) {
    +          return true;
    +        }
    +      }
    +      return false;
    +    }
    +
    +    private boolean containsUnsupportedCharacters(String filePathStr)
    +    {
    +      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
    +    }
    +  }
    +
    +  public static class HDFSFileMetaData extends FileMetadata
    +  {
    +    private String relativePath;
    +
    +    protected HDFSFileMetaData()
    --- End diff --
    
    super class is doing that, I am just carrying over the same thing.


> Create hdfs file input module 
> ------------------------------
>
>                 Key: APEXMALHAR-2008
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2008
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>            Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> To read HDFS files in parallel using Apex we normally use FileSplitter and FileReader
module. It would be a good idea to combine those operators as a unit in module. Having a module
will give us readily usable set of operators to read HDFS files. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message