Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1350CE3F1 for ; Fri, 11 Jan 2013 17:39:50 +0000 (UTC) Received: (qmail 82454 invoked by uid 500); 11 Jan 2013 17:39:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 82403 invoked by uid 500); 11 Jan 2013 17:39:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82389 invoked by uid 99); 11 Jan 2013 17:39:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 17:39:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 17:39:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4D1762388A29 for ; Fri, 11 Jan 2013 17:39:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1432185 - in /hadoop/common/branches/branch-1: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/hdfs/org/apache/hadoop/hdfs/tools/ src/test/ src/test/org/apa... Date: Fri, 11 Jan 2013 17:39:26 -0000 To: common-commits@hadoop.apache.org From: sradia@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130111173927.4D1762388A29@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sradia Date: Fri Jan 11 17:39:25 2013 New Revision: 1432185 URL: http://svn.apache.org/viewvc?rev=1432185&view=rev Log: HDFS-4256 Backport concatenation of files into a single file to branch-1 (sanjay Radia) Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/branch-1/src/test/commit-tests hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jan 11 17:39:25 2013 @@ -61,6 +61,10 @@ Release 1.2.0 - unreleased NetworkTopology with NodeGroup and use generic code for choosing datanode in Balancer. (Junping Du via szetszwo) + + HDFS-4256 Backport concatenation of files into a single file to branch-1 + (sanjay Radia) + IMPROVEMENTS HDFS-3515. Port HDFS-1457 to branch-1. (eli) Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 11 17:39:25 2013 @@ -944,6 +944,19 @@ public class DFSClient implements FSCons DSQuotaExceededException.class); } } + + /** + * Move blocks from src to trg and delete src + * See {@link ClientProtocol#concat(String, String [])}. + */ + public void concat(String trg, String [] srcs) throws IOException { + checkOpen(); + try { + namenode.concat(trg, srcs); + } catch(RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class); + } + } /** * Rename file or directory. Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Jan 11 17:39:25 2013 @@ -220,6 +220,24 @@ public class DistributedFileSystem exten } /** + * THIS IS DFS only operations, it is not part of FileSystem + * move blocks from srcs to trg + * and delete srcs afterwards + * all blocks should be the same size + * @param trg existing file to append to + * @param psrcs list of files (same block size, same replication) + * @throws IOException + */ + public void concat(Path trg, Path [] psrcs) throws IOException { + String [] srcs = new String [psrcs.length]; + for(int i=0; i -22) { + throw new IOException("Unexpected opcode " + opcode + + " for version " + logVersion); + } + numOpConcatDelete++; + int length = in.readInt(); + if (length < 3) { // trg, srcs.., timestam + throw new IOException("Incorrect data format. " + + "ConcatDelete operation."); + } + String trg = FSImage.readString(in); + int srcSize = length - 1 - 1; //trg and timestamp + String [] srcs = new String [srcSize]; + for(int i=0; i si = new HashSet(); + + // we put the following prerequisite for the operation + // replication and blocks sizes should be the same for ALL the blocks + + // check the target + final INodeFile trgInode = dir.getFileINode(target); + if(trgInode.isUnderConstruction()) { + throw new IllegalArgumentException("concat: target file " + + target + " is under construction"); + } + // per design target shouldn't be empty and all the blocks same size + if(trgInode.blocks.length == 0) { + throw new IllegalArgumentException("concat: target file " + + target + " is empty"); + } + + long blockSize = trgInode.getPreferredBlockSize(); + + // check the end block to be full + final BlockInfo last = trgInode.blocks[trgInode.blocks.length-1]; + if(blockSize != last.getNumBytes()) { + throw new IllegalArgumentException("The last block in " + target + + " is not full; last block size = " + last.getNumBytes() + + " but file block size = " + blockSize); + } + + si.add(trgInode); + short repl = trgInode.getReplication(); + + // now check the srcs + boolean endSrc = false; // final src file doesn't have to have full end block + for(int i=0; i= 0 && srcBlocks[idx].getNumBytes() != blockSize) { + throw new IllegalArgumentException("concat: the soruce file " + + src + " and the target file " + target + + " should have the same blocks sizes: target block size is " + + blockSize + " but the size of source block " + idx + " is " + + srcBlocks[idx].getNumBytes()); + } + + si.add(srcInode); + } + + // make sure no two files are the same + if(si.size() < srcs.length+1) { // trg + srcs + // it means at least two files are the same + throw new IllegalArgumentException( + "concat: at least two of the source files are the same"); + } + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + + Arrays.toString(srcs) + " to " + target); + } + + dir.concat(target,srcs); + } /** * stores the modification and access time for this inode. Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Fri Jan 11 17:39:25 2013 @@ -88,7 +88,7 @@ class INodeDirectoryWithQuota extends IN * @param dsQuota diskspace quota to be set * */ - void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException { + void setQuota(long newNsQuota, long newDsQuota) { nsQuota = newNsQuota; dsQuota = newDsQuota; } @@ -122,6 +122,15 @@ class INodeDirectoryWithQuota extends IN diskspace += dsDelta; } + /** Update the size of the tree + * + * @param nsDelta the change of the tree size + * @param dsDelta change to disk space occupied + */ + void addSpaceConsumed(long nsDelta, long dsDelta) { + setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta); + } + /** * Sets namespace and diskspace take by the directory rooted * at this INode. This should be used carefully. It does not check Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Jan 11 17:39:25 2013 @@ -112,6 +112,26 @@ class INodeFile extends INode { } /** + * append array of blocks to this.blocks + */ + void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) { + int size = this.blocks.length; + + BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks]; + System.arraycopy(this.blocks, 0, newlist, 0, size); + + for(INodeFile in: inodes) { + System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length); + size += in.blocks.length; + } + + for(BlockInfo bi: this.blocks) { + bi.setINode(this); + } + this.blocks = newlist; + } + + /** * add a block to the block list */ void addBlock(BlockInfo newblock) { @@ -136,9 +156,11 @@ class INodeFile extends INode { int collectSubtreeBlocksAndClear(List v) { parent = null; - for (BlockInfo blk : blocks) { - v.add(blk); - blk.setINode(null); + if (blocks != null && v != null) { + for (BlockInfo blk : blocks) { + v.add(blk); + blk.setINode(null); + } } blocks = null; return 1; @@ -171,6 +193,9 @@ class INodeFile extends INode { long diskspaceConsumed(Block[] blkArr) { long size = 0; + if(blkArr == null) + return 0; + for (Block blk : blkArr) { if (blk != null) { size += blk.getNumBytes(); Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jan 11 17:39:25 2013 @@ -830,6 +830,13 @@ public class NameNode implements ClientP public long getPreferredBlockSize(String filename) throws IOException { return namesystem.getPreferredBlockSize(filename); } + + /** + * {@inheritDoc} + */ + public void concat(String trg, String[] src) throws IOException { + namesystem.concat(trg, src); + } /** */ Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java?rev=1432185&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java Fri Jan 11 17:39:25 2013 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.tools; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; + + +public class HDFSConcat { + private final static String def_uri = "hdfs://localhost:9000"; + /** + * @param args + */ + public static void main(String... args) throws IOException { + + if(args.length < 2) { + System.err.println("Usage HDFSConcat target srcs.."); + System.exit(0); + } + + Configuration conf = new Configuration(); + String uri = conf.get("fs.default.name", def_uri); + Path path = new Path(uri); + DistributedFileSystem dfs = + (DistributedFileSystem)FileSystem.get(path.toUri(), conf); + + Path [] srcs = new Path[args.length-1]; + for(int i=1; i 0; if (!fs.mkdirs(fileName.getParent())) { throw new IOException("Mkdirs failed to create " + fileName.getParent().toString()); } FSDataOutputStream out = null; try { - out = fs.create(fileName, replFactor); - byte[] toWrite = new byte[1024]; - Random rb = new Random(seed); - long bytesToWrite = fileLen; - while (bytesToWrite>0) { - rb.nextBytes(toWrite); - int bytesToWriteNext = (1024 0) { + byte[] toWrite = new byte[bufferLen]; + Random rb = new Random(seed); + long bytesToWrite = fileLen; + while (bytesToWrite>0) { + rb.nextBytes(toWrite); + int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen + : (int) bytesToWrite; + + out.write(toWrite, 0, bytesToWriteNext); + bytesToWrite -= bytesToWriteNext; + } } - out.close(); - out = null; } finally { - IOUtils.closeStream(out); + if (out != null) { + out.close(); + } } } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1432185&r1=1432184&r2=1432185&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Jan 11 17:39:25 2013 @@ -270,6 +270,8 @@ public class TestDFSClientRetries extend public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {} public boolean rename(String src, String dst) throws IOException { return false; } + + public void concat(String trg, String[] srcs) throws IOException {} public boolean delete(String src) throws IOException { return false; } Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java?rev=1432185&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java Fri Jan 11 17:39:25 2013 @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHDFSConcat { + public static final Log LOG = LogFactory.getLog(TestHDFSConcat.class); + + private static final short REPL_FACTOR = 2; + + private MiniDFSCluster cluster; + private NameNode nn; + private DistributedFileSystem dfs; + + private static long blockSize = 512; + + + private static Configuration conf; + + static { + conf = new Configuration(); + conf.setLong("dfs.block.size", blockSize); + } + + @Before + public void startUpCluster() throws IOException { + cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null); + assertNotNull("Failed Cluster Creation", cluster); + cluster.waitClusterUp(); + dfs = (DistributedFileSystem) cluster.getFileSystem(); + assertNotNull("Failed to get FileSystem", dfs); + nn = cluster.getNameNode(); + assertNotNull("Failed to get NameNode", nn); + } + + @After + public void shutDownCluster() throws IOException { + if(dfs != null) { + dfs.close(); + } + if(cluster != null) { + cluster.shutdownDataNodes(); + cluster.shutdown(); + } + } + + /** + * Concatenates 10 files into one + * Verifies the final size, deletion of the file, number of blocks + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testConcat() throws IOException, InterruptedException { + final int numFiles = 10; + long fileLen = blockSize*3; + HdfsFileStatus fStatus; + FSDataInputStream stm; + + String trg = new String("/trg"); + Path trgPath = new Path(trg); + DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1); + fStatus = nn.getFileInfo(trg); + long trgLen = fStatus.getLen(); + long trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount(); + + Path [] files = new Path[numFiles]; + byte [] [] bytes = new byte [numFiles][(int)fileLen]; + LocatedBlocks [] lblocks = new LocatedBlocks[numFiles]; + long [] lens = new long [numFiles]; + + + int i = 0; + for(i=0; i