Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 62505 invoked from network); 29 Nov 2010 02:48:21 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 29 Nov 2010 02:48:21 -0000 Received: (qmail 45287 invoked by uid 500); 29 Nov 2010 02:48:21 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 45200 invoked by uid 500); 29 Nov 2010 02:48:21 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 45192 invoked by uid 99); 29 Nov 2010 02:48:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Nov 2010 02:48:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Nov 2010 02:48:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C4A1B23888EA; Mon, 29 Nov 2010 02:46:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1040005 - in /hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode: FSImageCompression.java FSImageFormat.java FSImageSerialization.java Date: Mon, 29 Nov 2010 02:46:41 -0000 To: hdfs-commits@hadoop.apache.org From: eli@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101129024641.C4A1B23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eli Date: Mon Nov 29 02:46:41 2010 New Revision: 1040005 URL: http://svn.apache.org/viewvc?rev=1040005&view=rev Log: Add missing files for HDFS-1473. Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java?rev=1040005&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java Mon Nov 29 02:46:41 2010 @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import org.apache.hadoop.io.Text; + +/** + * Simple container class that handles support for compressed fsimage files. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class FSImageCompression { + + /** Codec to use to save or load image, or null if the image is not compressed */ + private CompressionCodec imageCodec; + + /** + * Create a "noop" compression - i.e. uncompressed + */ + private FSImageCompression() { + } + + /** + * Create compression using a particular codec + */ + private FSImageCompression(CompressionCodec codec) { + imageCodec = codec; + } + + /** + * Create a "noop" compression - i.e. uncompressed + */ + public static FSImageCompression createNoopCompression() { + return new FSImageCompression(); + } + + /** + * Create a compression instance based on the user's configuration in the given + * Configuration object. + * @throws IOException if the specified codec is not available. + */ + public static FSImageCompression createCompression(Configuration conf) + throws IOException { + boolean compressImage = conf.getBoolean( + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, + DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); + + if (!compressImage) { + return createNoopCompression(); + } + + String codecClassName = conf.get( + DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, + DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT); + return createCompression(conf, codecClassName); + } + + /** + * Create a compression instance using the codec specified by + * codecClassName + */ + private static FSImageCompression createCompression(Configuration conf, + String codecClassName) + throws IOException { + + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + CompressionCodec codec = factory.getCodecByClassName(codecClassName); + if (codec == null) { + throw new IOException("Not a supported codec: " + codecClassName); + } + + return new FSImageCompression(codec); + } + + /** + * Create a compression instance based on a header read from an input stream. + * @throws IOException if the specified codec is not available or the + * underlying IO fails. + */ + public static FSImageCompression readCompressionHeader( + Configuration conf, + DataInputStream dis) throws IOException + { + boolean isCompressed = dis.readBoolean(); + + if (!isCompressed) { + return createNoopCompression(); + } else { + String codecClassName = Text.readString(dis); + return createCompression(conf, codecClassName); + } + } + + /** + * Unwrap a compressed input stream by wrapping it with a decompressor based + * on this codec. If this instance represents no compression, simply adds + * buffering to the input stream. + * @return a buffered stream that provides uncompressed data + * @throws IOException If the decompressor cannot be instantiated or an IO + * error occurs. + */ + public DataInputStream unwrapInputStream(InputStream is) throws IOException { + if (imageCodec != null) { + return new DataInputStream(imageCodec.createInputStream(is)); + } else { + return new DataInputStream(new BufferedInputStream(is)); + } + } + + /** + * Write out a header to the given stream that indicates the chosen + * compression codec, and return the same stream wrapped with that codec. + * If no codec is specified, simply adds buffering to the stream, so that + * the returned stream is always buffered. + * + * @param os The stream to write header to and wrap. This stream should + * be unbuffered. + * @return A stream wrapped with the specified compressor, or buffering + * if compression is not enabled. + * @throws IOException if an IO error occurs or the compressor cannot be + * instantiated + */ + public DataOutputStream writeHeaderAndWrapStream(OutputStream os) + throws IOException { + DataOutputStream dos = new DataOutputStream(os); + + dos.writeBoolean(imageCodec != null); + + if (imageCodec != null) { + String codecClassName = imageCodec.getClass().getCanonicalName(); + Text.writeString(dos, codecClassName); + + return new DataOutputStream(imageCodec.createOutputStream(os)); + } else { + // use a buffered output stream + return new DataOutputStream(new BufferedOutputStream(os)); + } + } + + @Override + public String toString() { + if (imageCodec != null) { + return "codec " + imageCodec.getClass().getCanonicalName(); + } else { + return "no compression"; + } + } +} \ No newline at end of file Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1040005&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Nov 29 02:46:41 2010 @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.hdfs.server.common.Util.now; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.io.Text; + +/** + * Contains inner classes for reading or writing the on-disk format for FSImages + */ +public abstract class FSImageFormat { + private static final Log LOG = FSImage.LOG; + + /** + * A one-shot class responsible for loading an image. The load() function + * should be called once, after which the getter methods may be used to retrieve + * information about the image that was loaded, if loading was successful. + */ + public static class Loader { + private final Configuration conf; + + /** Set to true once a file has been loaded using this loader. */ + private boolean loaded = false; + + /** The image version of the loaded file */ + private int imgVersion; + /** The namespace ID of the loaded file */ + private int imgNamespaceID; + /** The MD5 sum of the loaded file */ + private MD5Hash imgDigest; + + public Loader(Configuration conf) { + this.conf = conf; + } + + /** + * Return the version number of the image that has been loaded. + * @throws IllegalStateException if load() has not yet been called. + */ + int getLoadedImageVersion() { + checkLoaded(); + return imgVersion; + } + + /** + * Return the MD5 checksum of the image that has been loaded. + * @throws IllegalStateException if load() has not yet been called. + */ + MD5Hash getLoadedImageMd5() { + checkLoaded(); + return imgDigest; + } + + /** + * Return the namespace ID of the image that has been loaded. + * @throws IllegalStateException if load() has not yet been called. + */ + int getLoadedNamespaceID() { + checkLoaded(); + return imgNamespaceID; + } + + /** + * Throw IllegalStateException if load() has not yet been called. + */ + private void checkLoaded() { + if (!loaded) { + throw new IllegalStateException("Image not yet loaded!"); + } + } + + /** + * Throw IllegalStateException if load() has already been called. + */ + private void checkNotLoaded() { + if (loaded) { + throw new IllegalStateException("Image already loaded!"); + } + } + + void load(File curFile, FSNamesystem targetNamesystem) + throws IOException + { + checkNotLoaded(); + assert curFile != null : "curFile is null"; + + long startTime = now(); + FSDirectory fsDir = targetNamesystem.dir; + + // + // Load in bits + // + MessageDigest digester = MD5Hash.getDigester(); + DigestInputStream fin = new DigestInputStream( + new FileInputStream(curFile), digester); + + DataInputStream in = new DataInputStream(fin); + try { + /* + * Note: Remove any checks for version earlier than + * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get + * to here with older images. + */ + + /* + * TODO we need to change format of the image file + * it should not contain version and namespace fields + */ + // read image version: first appeared in version -1 + imgVersion = in.readInt(); + + // read namespaceID: first appeared in version -2 + imgNamespaceID = in.readInt(); + + // read number of files + long numFiles = readNumFiles(in); + + // read in the last generation stamp. + if (imgVersion <= -12) { + long genstamp = in.readLong(); + targetNamesystem.setGenerationStamp(genstamp); + } + + // read compression related info + FSImageCompression compression; + if (imgVersion <= -25) { // -25: 1st version providing compression option + compression = FSImageCompression.readCompressionHeader(conf, in); + } else { + compression = FSImageCompression.createNoopCompression(); + } + in = compression.unwrapInputStream(fin); + + LOG.info("Loading image file " + curFile + " using " + compression); + + + // read file info + short replication = targetNamesystem.getDefaultReplication(); + + LOG.info("Number of files = " + numFiles); + + byte[][] pathComponents; + byte[][] parentPath = {{}}; + INodeDirectory parentINode = fsDir.rootDir; + for (long i = 0; i < numFiles; i++) { + long modificationTime = 0; + long atime = 0; + long blockSize = 0; + pathComponents = FSImageSerialization.readPathComponents(in); + replication = in.readShort(); + replication = targetNamesystem.adjustReplication(replication); + modificationTime = in.readLong(); + if (imgVersion <= -17) { + atime = in.readLong(); + } + if (imgVersion <= -8) { + blockSize = in.readLong(); + } + int numBlocks = in.readInt(); + Block blocks[] = null; + + // for older versions, a blocklist of size 0 + // indicates a directory. + if ((-9 <= imgVersion && numBlocks > 0) || + (imgVersion < -9 && numBlocks >= 0)) { + blocks = new Block[numBlocks]; + for (int j = 0; j < numBlocks; j++) { + blocks[j] = new Block(); + if (-14 < imgVersion) { + blocks[j].set(in.readLong(), in.readLong(), + GenerationStamp.GRANDFATHER_GENERATION_STAMP); + } else { + blocks[j].readFields(in); + } + } + } + // Older versions of HDFS does not store the block size in inode. + // If the file has more than one block, use the size of the + // first block as the blocksize. Otherwise use the default block size. + // + if (-8 <= imgVersion && blockSize == 0) { + if (numBlocks > 1) { + blockSize = blocks[0].getNumBytes(); + } else { + long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0); + blockSize = Math.max(targetNamesystem.getDefaultBlockSize(), first); + } + } + + // get quota only when the node is a directory + long nsQuota = -1L; + if (imgVersion <= -16 && blocks == null && numBlocks == -1) { + nsQuota = in.readLong(); + } + long dsQuota = -1L; + if (imgVersion <= -18 && blocks == null && numBlocks == -1) { + dsQuota = in.readLong(); + } + + // Read the symlink only when the node is a symlink + String symlink = ""; + if (imgVersion <= -23 && numBlocks == -2) { + symlink = Text.readString(in); + } + + PermissionStatus permissions = targetNamesystem.getUpgradePermission(); + if (imgVersion <= -11) { + permissions = PermissionStatus.read(in); + } + + if (isRoot(pathComponents)) { // it is the root + // update the root's attributes + if (nsQuota != -1 || dsQuota != -1) { + fsDir.rootDir.setQuota(nsQuota, dsQuota); + } + fsDir.rootDir.setModificationTime(modificationTime); + fsDir.rootDir.setPermissionStatus(permissions); + continue; + } + // check if the new inode belongs to the same parent + if(!isParent(pathComponents, parentPath)) { + parentINode = null; + parentPath = getParent(pathComponents); + } + // add new inode + // without propagating modification time to parent + parentINode = fsDir.addToParent(pathComponents, parentINode, permissions, + blocks, symlink, replication, modificationTime, + atime, nsQuota, dsQuota, blockSize, false); + } + + // load datanode info + this.loadDatanodes(in); + + // load Files Under Construction + this.loadFilesUnderConstruction(in, targetNamesystem); + + this.loadSecretManagerState(in, targetNamesystem); + + } finally { + in.close(); + } + + imgDigest = new MD5Hash(digester.digest()); + loaded = true; + + LOG.info("Image file of size " + curFile.length() + " loaded in " + + (now() - startTime)/1000 + " seconds."); + } + + + private void loadDatanodes(DataInputStream in) throws IOException { + if (imgVersion > -3) // pre datanode image version + return; + if (imgVersion <= -12) { + return; // new versions do not store the datanodes any more. + } + int size = in.readInt(); + for(int i = 0; i < size; i++) { + // We don't need to add these descriptors any more. + FSImageSerialization.DatanodeImage.skipOne(in); + } + } + + private void loadFilesUnderConstruction(DataInputStream in, + FSNamesystem fs) throws IOException { + FSDirectory fsDir = fs.dir; + if (imgVersion > -13) // pre lease image version + return; + int size = in.readInt(); + + LOG.info("Number of files under construction = " + size); + + for (int i = 0; i < size; i++) { + INodeFileUnderConstruction cons = + FSImageSerialization.readINodeUnderConstruction(in); + + // verify that file exists in namespace + String path = cons.getLocalName(); + INode old = fsDir.getFileINode(path); + if (old == null) { + throw new IOException("Found lease for non-existent file " + path); + } + if (old.isDirectory()) { + throw new IOException("Found lease for directory " + path); + } + INodeFile oldnode = (INodeFile) old; + fsDir.replaceNode(path, oldnode, cons); + fs.leaseManager.addLease(cons.getClientName(), path); + } + } + + private void loadSecretManagerState(DataInputStream in, + FSNamesystem fs) throws IOException { + if (imgVersion > -23) { + //SecretManagerState is not available. + //This must not happen if security is turned on. + return; + } + fs.loadSecretManagerState(in); + } + + + private long readNumFiles(DataInputStream in) throws IOException { + if (imgVersion <= -16) { + return in.readLong(); + } else { + return in.readInt(); + } + } + + private boolean isRoot(byte[][] path) { + return path.length == 1 && + path[0] == null; + } + + private boolean isParent(byte[][] path, byte[][] parent) { + if (path == null || parent == null) + return false; + if (parent.length == 0 || path.length != parent.length + 1) + return false; + boolean isParent = true; + for (int i = 0; i < parent.length; i++) { + isParent = isParent && Arrays.equals(path[i], parent[i]); + } + return isParent; + } + + /** + * Return string representing the parent of the given path. + */ + String getParent(String path) { + return path.substring(0, path.lastIndexOf(Path.SEPARATOR)); + } + + byte[][] getParent(byte[][] path) { + byte[][] result = new byte[path.length - 1][]; + for (int i = 0; i < result.length; i++) { + result[i] = new byte[path[i].length]; + System.arraycopy(path[i], 0, result[i], 0, path[i].length); + } + return result; + } + } + + /** + * A one-shot class responsible for writing an image file. + * The write() function should be called once, after which the getter + * functions may be used to retrieve information about the file that was written. + */ + static class Writer { + /** Set to true once an image has been written */ + private boolean written = false; + + /** The MD5 checksum of the file that was written */ + private MD5Hash writtenDigest; + + static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR); + + /** @throws IllegalStateException if the instance has not yet written an image */ + private void checkWritten() { + if (!written) { + throw new IllegalStateException("FSImageWriter has not written an image"); + } + } + + /** @throws IllegalStateException if the instance has already written an image */ + private void checkNotWritten() { + if (written) { + throw new IllegalStateException("FSImageWriter has already written an image"); + } + } + + /** + * Return the MD5 checksum of the image file that was saved. + */ + MD5Hash getWrittenDigest() { + checkWritten(); + return writtenDigest; + } + + void write(File newFile, + FSNamesystem sourceNamesystem, + FSImageCompression compression) + throws IOException { + checkNotWritten(); + + FSDirectory fsDir = sourceNamesystem.dir; + long startTime = now(); + // + // Write out data + // + MessageDigest digester = MD5Hash.getDigester(); + FileOutputStream fout = new FileOutputStream(newFile); + DigestOutputStream fos = new DigestOutputStream(fout, digester); + DataOutputStream out = new DataOutputStream(fos); + try { + out.writeInt(FSConstants.LAYOUT_VERSION); + out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency + out.writeLong(fsDir.rootDir.numItemsInTree()); + out.writeLong(sourceNamesystem.getGenerationStamp()); + + // write compression info and set up compressed stream + out = compression.writeHeaderAndWrapStream(fos); + LOG.info("Saving image file " + newFile + + " using " + compression); + + + byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH]; + ByteBuffer strbuf = ByteBuffer.wrap(byteStore); + // save the root + FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out); + // save the rest of the nodes + saveImage(strbuf, 0, fsDir.rootDir, out); + sourceNamesystem.saveFilesUnderConstruction(out); + sourceNamesystem.saveSecretManagerState(out); + strbuf = null; + + out.flush(); + fout.getChannel().force(true); + } finally { + out.close(); + } + + written = true; + // set md5 of the saved image + writtenDigest = new MD5Hash(digester.digest()); + + LOG.info("Image file of size " + newFile.length() + " saved in " + + (now() - startTime)/1000 + " seconds."); + } + + /** + * Save file tree image starting from the given root. + * This is a recursive procedure, which first saves all children of + * a current directory and then moves inside the sub-directories. + */ + private static void saveImage(ByteBuffer parentPrefix, + int prefixLength, + INodeDirectory current, + DataOutputStream out) throws IOException { + int newPrefixLength = prefixLength; + if (current.getChildrenRaw() == null) + return; + for(INode child : current.getChildren()) { + // print all children first + parentPrefix.position(prefixLength); + parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); + FSImageSerialization.saveINode2Image(parentPrefix, child, out); + } + for(INode child : current.getChildren()) { + if(!child.isDirectory()) + continue; + parentPrefix.position(prefixLength); + parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes()); + newPrefixLength = parentPrefix.position(); + saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out); + } + parentPrefix.position(prefixLength); + } + } +} Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1040005&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Nov 29 02:46:41 2010 @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DeprecatedUTF8; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Static utility functions for serializing various pieces of data in the correct + * format for the FSImage file. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class FSImageSerialization { + + /** + * In order to reduce allocation, we reuse some static objects. However, the methods + * in this class should be thread-safe since image-saving is multithreaded, so + * we need to keep the static objects in a thread-local. + */ + static private final ThreadLocal TL_DATA = + new ThreadLocal() { + @Override + protected TLData initialValue() { + return new TLData(); + } + }; + + /** + * Simple container "struct" for threadlocal data. + */ + static private final class TLData { + final DeprecatedUTF8 U_STR = new DeprecatedUTF8(); + final FsPermission FILE_PERM = new FsPermission((short) 0); + } + + // Helper function that reads in an INodeUnderConstruction + // from the input stream + // + static INodeFileUnderConstruction readINodeUnderConstruction( + DataInputStream in) throws IOException { + byte[] name = readBytes(in); + short blockReplication = in.readShort(); + long modificationTime = in.readLong(); + long preferredBlockSize = in.readLong(); + int numBlocks = in.readInt(); + BlockInfo[] blocks = new BlockInfo[numBlocks]; + Block blk = new Block(); + int i = 0; + for (; i < numBlocks-1; i++) { + blk.readFields(in); + blocks[i] = new BlockInfo(blk, blockReplication); + } + // last block is UNDER_CONSTRUCTION + if(numBlocks > 0) { + blk.readFields(in); + blocks[i] = new BlockInfoUnderConstruction( + blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + } + PermissionStatus perm = PermissionStatus.read(in); + String clientName = readString(in); + String clientMachine = readString(in); + + // These locations are not used at all + int numLocs = in.readInt(); + DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs]; + for (i = 0; i < numLocs; i++) { + locations[i] = new DatanodeDescriptor(); + locations[i].readFields(in); + } + + return new INodeFileUnderConstruction(name, + blockReplication, + modificationTime, + preferredBlockSize, + blocks, + perm, + clientName, + clientMachine, + null); + } + + // Helper function that writes an INodeUnderConstruction + // into the input stream + // + static void writeINodeUnderConstruction(DataOutputStream out, + INodeFileUnderConstruction cons, + String path) + throws IOException { + writeString(path, out); + out.writeShort(cons.getReplication()); + out.writeLong(cons.getModificationTime()); + out.writeLong(cons.getPreferredBlockSize()); + int nrBlocks = cons.getBlocks().length; + out.writeInt(nrBlocks); + for (int i = 0; i < nrBlocks; i++) { + cons.getBlocks()[i].write(out); + } + cons.getPermissionStatus().write(out); + writeString(cons.getClientName(), out); + writeString(cons.getClientMachine(), out); + + out.writeInt(0); // do not store locations of last block + } + + /* + * Save one inode's attributes to the image. + */ + static void saveINode2Image(ByteBuffer name, + INode node, + DataOutputStream out) throws IOException { + int nameLen = name.position(); + out.writeShort(nameLen); + out.write(name.array(), name.arrayOffset(), nameLen); + FsPermission filePerm = TL_DATA.get().FILE_PERM; + if (node.isDirectory()) { + out.writeShort(0); // replication + out.writeLong(node.getModificationTime()); + out.writeLong(0); // access time + out.writeLong(0); // preferred block size + out.writeInt(-1); // # of blocks + out.writeLong(node.getNsQuota()); + out.writeLong(node.getDsQuota()); + filePerm.fromShort(node.getFsPermissionShort()); + PermissionStatus.write(out, node.getUserName(), + node.getGroupName(), + filePerm); + } else if (node.isLink()) { + out.writeShort(0); // replication + out.writeLong(0); // modification time + out.writeLong(0); // access time + out.writeLong(0); // preferred block size + out.writeInt(-2); // # of blocks + Text.writeString(out, ((INodeSymlink)node).getLinkValue()); + filePerm.fromShort(node.getFsPermissionShort()); + PermissionStatus.write(out, node.getUserName(), + node.getGroupName(), + filePerm); + } else { + INodeFile fileINode = (INodeFile)node; + out.writeShort(fileINode.getReplication()); + out.writeLong(fileINode.getModificationTime()); + out.writeLong(fileINode.getAccessTime()); + out.writeLong(fileINode.getPreferredBlockSize()); + Block[] blocks = fileINode.getBlocks(); + out.writeInt(blocks.length); + for (Block blk : blocks) + blk.write(out); + filePerm.fromShort(fileINode.getFsPermissionShort()); + PermissionStatus.write(out, fileINode.getUserName(), + fileINode.getGroupName(), + filePerm); + } + } + + // This should be reverted to package private once the ImageLoader + // code is moved into this package. This method should not be called + // by other code. + @SuppressWarnings("deprecation") + public static String readString(DataInputStream in) throws IOException { + DeprecatedUTF8 ustr = TL_DATA.get().U_STR; + ustr.readFields(in); + return ustr.toString(); + } + + static String readString_EmptyAsNull(DataInputStream in) throws IOException { + final String s = readString(in); + return s.isEmpty()? null: s; + } + + @SuppressWarnings("deprecation") + static void writeString(String str, DataOutputStream out) throws IOException { + DeprecatedUTF8 ustr = TL_DATA.get().U_STR; + ustr.set(str); + ustr.write(out); + } + + + // Same comments apply for this method as for readString() + @SuppressWarnings("deprecation") + public static byte[] readBytes(DataInputStream in) throws IOException { + DeprecatedUTF8 ustr = TL_DATA.get().U_STR; + ustr.readFields(in); + int len = ustr.getLength(); + byte[] bytes = new byte[len]; + System.arraycopy(ustr.getBytes(), 0, bytes, 0, len); + return bytes; + } + + /** + * Reading the path from the image and converting it to byte[][] directly + * this saves us an array copy and conversions to and from String + * @param in + * @return the array each element of which is a byte[] representation + * of a path component + * @throws IOException + */ + @SuppressWarnings("deprecation") + public static byte[][] readPathComponents(DataInputStream in) + throws IOException { + DeprecatedUTF8 ustr = TL_DATA.get().U_STR; + + ustr.readFields(in); + return DFSUtil.bytes2byteArray(ustr.getBytes(), + ustr.getLength(), (byte) Path.SEPARATOR_CHAR); + } + + /** + * DatanodeImage is used to store persistent information + * about datanodes into the fsImage. + */ + static class DatanodeImage implements Writable { + DatanodeDescriptor node = new DatanodeDescriptor(); + + static void skipOne(DataInput in) throws IOException { + DatanodeImage nodeImage = new DatanodeImage(); + nodeImage.readFields(in); + } + + ///////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////// + /** + * Public method that serializes the information about a + * Datanode to be stored in the fsImage. + */ + public void write(DataOutput out) throws IOException { + new DatanodeID(node).write(out); + out.writeLong(node.getCapacity()); + out.writeLong(node.getRemaining()); + out.writeLong(node.getLastUpdate()); + out.writeInt(node.getXceiverCount()); + } + + /** + * Public method that reads a serialized Datanode + * from the fsImage. + */ + public void readFields(DataInput in) throws IOException { + DatanodeID id = new DatanodeID(); + id.readFields(in); + long capacity = in.readLong(); + long remaining = in.readLong(); + long lastUpdate = in.readLong(); + int xceiverCount = in.readInt(); + + // update the DatanodeDescriptor with the data we read in + node.updateRegInfo(id); + node.setStorageID(id.getStorageID()); + node.setCapacity(capacity); + node.setRemaining(remaining); + node.setLastUpdate(lastUpdate); + node.setXceiverCount(xceiverCount); + } + } +} \ No newline at end of file