Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 438791879F for ; Fri, 18 Mar 2016 09:06:39 +0000 (UTC) Received: (qmail 82281 invoked by uid 500); 18 Mar 2016 09:06:39 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 82219 invoked by uid 500); 18 Mar 2016 09:06:39 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 82208 invoked by uid 99); 18 Mar 2016 09:06:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 09:06:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 330A8C0221 for ; Fri, 18 Mar 2016 09:06:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id i7Yz5GoqC2Ou for ; Fri, 18 Mar 2016 09:06:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id D12495F5A1 for ; Fri, 18 Mar 2016 09:06:34 +0000 (UTC) Received: (qmail 82057 invoked by uid 99); 18 Mar 2016 09:06:34 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 09:06:34 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 875842C14F4 for ; Fri, 18 Mar 2016 09:06:33 +0000 (UTC) Date: Fri, 18 Mar 2016 09:06:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-2013) HDFS output module for file copy MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/APEXMALHAR-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201214#comment-15201214 ] ASF GitHub Bot commented on APEXMALHAR-2013: -------------------------------------------- Github user DT-Priyanka commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/216#discussion_r56627887 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java --- @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Queue; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import com.google.common.collect.Queues; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.Synchronizer.StitchBlock; +import com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData; + +/** + * This is generic File Stitcher which can be used to merge data from one or + * more files into single stitched file. StitchedFileMetaData defines + * constituents of the stitched file. + * + * This class uses Reconciler to + */ +public class FileStitcher extends AbstractReconciler +{ + /** + * Filesystem on which application is running + */ + protected transient FileSystem appFS; + + /** + * Destination file system + */ + protected transient FileSystem outputFS; + + /** + * Path for destination directory + */ + @NotNull + protected String filePath; + + /** + * Path for blocks directory + */ + protected transient String blocksDir; + + protected static final String PART_FILE_EXTENTION = "._COPYING_"; + + /** + * Queue maintaining successful files + */ + protected Queue successfulFiles = Queues.newLinkedBlockingQueue(); + + /** + * Queue maintaining skipped files + */ + protected Queue skippedFiles = Queues.newLinkedBlockingQueue(); + + /** + * Queue maintaining failed files + */ + protected Queue failedFiles = Queues.newLinkedBlockingQueue(); + + /** + * Output port for emitting completed stitched files metadata + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort completedFilesMetaOutput = new DefaultOutputPort(); + + private boolean writeChecksum = true; + protected transient Path tempOutFilePath; + + @Override + public void setup(Context.OperatorContext context) + { + + blocksDir = context.getValue(DAGContext.APPLICATION_PATH) + Path.SEPARATOR + BlockWriter.SUBDIR_BLOCKS; + + try { + outputFS = getOutputFSInstance(); + outputFS.setWriteChecksum(writeChecksum); + } catch (IOException ex) { + throw new RuntimeException("Exception in getting output file system.", ex); + } + try { + appFS = getAppFSInstance(); + } catch (IOException ex) { + try { + outputFS.close(); + } catch (IOException e) { + throw new RuntimeException("Exception in closing output file system.", e); + } + throw new RuntimeException("Exception in getting application file system.", ex); + } + + super.setup(context); // Calling it at the end as the reconciler thread uses resources allocated above. + } + + /* + * Calls super.endWindow() and sets counters + * @see com.datatorrent.api.BaseOperator#endWindow() + */ + @Override + public void endWindow() + { + T stitchedFileMetaData; + int size = doneTuples.size(); + for (int i = 0; i < size; i++) { + stitchedFileMetaData = doneTuples.peek(); + // If a tuple is present in doneTuples, it has to be also present in successful/failed/skipped + // as processCommittedData adds tuple in successful/failed/skipped + // and then reconciler thread add that in doneTuples + if (successfulFiles.contains(stitchedFileMetaData)) { + successfulFiles.remove(stitchedFileMetaData); + LOG.debug("File copy successful: {}", stitchedFileMetaData.getStitchedFileRelativePath()); + } else if (skippedFiles.contains(stitchedFileMetaData)) { + skippedFiles.remove(stitchedFileMetaData); + LOG.debug("File copy skipped: {}", stitchedFileMetaData.getStitchedFileRelativePath()); + } else if (failedFiles.contains(stitchedFileMetaData)) { + failedFiles.remove(stitchedFileMetaData); + LOG.debug("File copy failed: {}", stitchedFileMetaData.getStitchedFileRelativePath()); + } else { + throw new RuntimeException("Tuple present in doneTuples but not in successfulFiles: " + + stitchedFileMetaData.getStitchedFileRelativePath()); + } + completedFilesMetaOutput.emit(stitchedFileMetaData); + committedTuples.remove(stitchedFileMetaData); + doneTuples.poll(); + } + } + + /** + * + * @return Application FileSystem instance + * @throws IOException + */ + protected FileSystem getAppFSInstance() throws IOException + { + return FileSystem.newInstance((new Path(blocksDir)).toUri(), new Configuration()); + } + + /** + * + * @return Destination FileSystem instance + * @throws IOException + */ + protected FileSystem getOutputFSInstance() throws IOException + { + return FileSystem.newInstance((new Path(filePath)).toUri(), new Configuration()); + } + + @Override + public void teardown() + { + super.teardown(); + + boolean gotException = false; + try { + if (appFS != null) { + appFS.close(); + appFS = null; + } + } catch (IOException e) { + gotException = true; + } + + try { + if (outputFS != null) { + outputFS.close(); + outputFS = null; + } + } catch (IOException e) { + gotException = true; + } + if (gotException) { + throw new RuntimeException("Exception while closing file systems."); + } + } + + /** + * Enques incoming data for for processing + */ + @Override + protected void processTuple(T stitchedFileMetaData) + { + enqueueForProcessing(stitchedFileMetaData); + } + + /** + * Stitches the output file when all blocks for that file are commited + */ + @Override + protected void processCommittedData(T stitchedFileMetaData) + { + try { + mergeOutputFile(stitchedFileMetaData); + } catch (IOException e) { + throw new RuntimeException("Unable to merge file: " + stitchedFileMetaData.getStitchedFileRelativePath(), e); + } + } + + /** + * Read data from block files and write to output file. Information about + * which block files should be read is specified in outFileMetadata + * + * @param stitchedFileMetaData + * @throws IOException + */ + + protected void mergeOutputFile(T stitchedFileMetaData) throws IOException + { + mergeBlocks(stitchedFileMetaData); + successfulFiles.add(stitchedFileMetaData); + LOG.debug("Completed processing file: {} ", stitchedFileMetaData.getStitchedFileRelativePath()); + } + + protected void mergeBlocks(T stitchedFileMetaData) throws IOException + { + //when writing to tmp files there can be vagrant tmp files which we have to clean + final Path dst = new Path(filePath, stitchedFileMetaData.getStitchedFileRelativePath()); + PathFilter tempFileFilter = new PathFilter() + { + @Override + public boolean accept(Path path) + { + return path.getName().startsWith(dst.getName()) && path.getName().endsWith(PART_FILE_EXTENTION); + } + }; + if (outputFS.exists(dst.getParent())) { + FileStatus[] statuses = outputFS.listStatus(dst.getParent(), tempFileFilter); + for (FileStatus status : statuses) { + String statusName = status.getPath().getName(); + LOG.debug("deleting vagrant file {}", statusName); + outputFS.delete(status.getPath(), true); + } + } + tempOutFilePath = new Path(filePath, + stitchedFileMetaData.getStitchedFileRelativePath() + '.' + System.currentTimeMillis() + PART_FILE_EXTENTION); + try { + writeTempOutputFile(stitchedFileMetaData); + moveToFinalFile(stitchedFileMetaData); + } catch (BlockNotFoundException e) { + LOG.info("Block file {} not found. Assuming recovery mode for file {}. ", e.getBlockPath(), + stitchedFileMetaData.getStitchedFileRelativePath()); + //Remove temp output file + outputFS.delete(tempOutFilePath, false); + } + } + + /** + * Writing all Stitch blocks to temporary file + * + * @param stitchedFileMetaData + * @throws IOException + * @throws BlockNotFoundException + */ + protected OutputStream writeTempOutputFile(T stitchedFileMetaData) throws IOException, BlockNotFoundException + { + OutputStream outputStream = getOutputStream(tempOutFilePath); + try { + for (StitchBlock outputBlock : stitchedFileMetaData.getStitchBlocksList()) { + outputBlock.writeTo(appFS, blocksDir, outputStream); + } + } finally { + outputStream.close(); + } + return outputStream; + } + + protected OutputStream getOutputStream(Path partFilePath) throws IOException + { + return outputFS.create(partFilePath); + } + + /** + * Moving temp output file to final file + * + * @param stitchedFileMetaData + * @throws IOException + */ + protected void moveToFinalFile(T stitchedFileMetaData) throws IOException + { + Path destination = new Path(filePath, stitchedFileMetaData.getStitchedFileRelativePath()); + moveToFinalFile(tempOutFilePath, destination); + } + + /** + * Moving temp output file to final file + * + * @param tempOutFilePath + * Temporary output file + * @param destination + * Destination directory path + * @throws IOException + */ + protected void moveToFinalFile(Path tempOutFilePath, Path destination) throws IOException + { + Path src = Path.getPathWithoutSchemeAndAuthority(tempOutFilePath); + Path dst = Path.getPathWithoutSchemeAndAuthority(destination); + + boolean moveSuccessful = false; + if (!outputFS.exists(dst.getParent())) { + outputFS.mkdirs(dst.getParent()); + } + if (outputFS.exists(dst)) { + outputFS.delete(dst, false); + } + moveSuccessful = outputFS.rename(src, dst); + + if (moveSuccessful) { + LOG.debug("File {} moved successfully to destination folder.", dst); + } else { + throw new RuntimeException("Unable to move file from " + src + " to " + dst); + } + } + + public String getBlocksDir() + { + return blocksDir; + } + + public void setBlocksDir(String blocksDir) --- End diff -- just put javadocs on all getter/setters > HDFS output module for file copy > -------------------------------- > > Key: APEXMALHAR-2013 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2013 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Yogi Devendra > Assignee: Yogi Devendra > > To write files to HDFS using block-by-block approach. > Main use-case being to copy the files. Thus, original sequence of blocks has to be maintained. > To achieve this goal, this module would use information emitted by HDFS input module (APEXMALHAR-2008) viz. FileMetaData, BlockMetaData, BlockData. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)