apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2013) HDFS output module for file copy
Date Mon, 21 Mar 2016 06:03:25 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203772#comment-15203772
] 

ASF GitHub Bot commented on APEXMALHAR-2013:
--------------------------------------------

Github user yogidevendra commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/216#discussion_r56784713
  
    --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/BlockWriter.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.io.fs;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.lang.mutable.MutableLong;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.DefaultPartition;
    +import com.datatorrent.api.Partitioner;
    +import com.datatorrent.lib.counters.BasicCounters;
    +import com.datatorrent.lib.io.block.AbstractBlockReader;
    +import com.datatorrent.lib.io.block.BlockMetadata;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Writes a block to the appFS (HDFS on which app is running). This is temporary
    + * write to HDFS to handle large files.
    + */
    +public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>>
    +    implements Partitioner<BlockWriter>
    +{
    +  /**
    +   * Directory under application directory where blocks gets stored
    +   */
    +  public static final String SUBDIR_BLOCKS = "blocks";
    +  /**
    +   * List of FileBlockMetadata received in the current window.
    +   */
    +  private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas;
    +
    +  /**
    +   * Input port to receive Block meta data
    +   */
    +  public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput
= new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
    +  {
    +
    +    @Override
    +    public void process(BlockMetadata.FileBlockMetadata blockMetadata)
    +    {
    +      blockMetadatas.add(blockMetadata);
    +      LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath());
    +    }
    +  };
    +
    +  /**
    +   * Output port to send Block meta data to downstream operator
    +   */
    +  public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput
= new DefaultOutputPort<BlockMetadata.FileBlockMetadata>();
    +
    +  public BlockWriter()
    +  {
    +    super();
    +    blockMetadatas = Lists.newArrayList();
    +    //The base class puts a restriction that the file-path cannot be null. With this
block writer it is
    +    //being initialized in setup and not through configuration. So setting it to empty
string.
    +    filePath = "";
    +  }
    +
    +  /**
    +   * Also, initializes the filePath based on Application path
    +   */
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    filePath = context.getValue(Context.DAGContext.APPLICATION_PATH) + Path.SEPARATOR
+ SUBDIR_BLOCKS;
    +    super.setup(context);
    +  }
    +
    +  /**
    +   * Finalizes files for all the blockMetaDatas received during current window
    +   */
    +  @Override
    +  public void endWindow()
    +  {
    +    super.endWindow();
    +
    +    streamsCache.asMap().clear();
    +    endOffsets.clear();
    +
    +    for (BlockMetadata.FileBlockMetadata blockMetadata : blockMetadatas) {
    +      try {
    +        finalizeFile(Long.toString(blockMetadata.getBlockId()));
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      }
    +      blockMetadataOutput.emit(blockMetadata);
    +    }
    +    blockMetadatas.clear();
    +  }
    +
    +  @Override
    +  protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple)
    +  {
    +    return Long.toString(tuple.getBlockId());
    +  }
    +
    +  @Override
    +  protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
    +  {
    +    return tuple.getRecord().buffer;
    +  }
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class);
    --- End diff --
    
    moved at the end.


> HDFS output module for file copy
> --------------------------------
>
>                 Key: APEXMALHAR-2013
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2013
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Yogi Devendra
>            Assignee: Yogi Devendra
>
> To write files to HDFS using block-by-block approach.
> Main use-case being to copy the files. Thus, original sequence of blocks has to be maintained.

> To achieve this goal, this module would use information emitted by  HDFS input module
(APEXMALHAR-2008) viz. FileMetaData, BlockMetaData, BlockData.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message