Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AAA66108A8 for ; Wed, 15 Jan 2014 23:15:48 +0000 (UTC) Received: (qmail 15514 invoked by uid 500); 15 Jan 2014 23:15:47 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 15468 invoked by uid 500); 15 Jan 2014 23:15:47 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 15460 invoked by uid 99); 15 Jan 2014 23:15:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jan 2014 23:15:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jan 2014 23:15:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0EB9D23888E2; Wed, 15 Jan 2014 23:15:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1558599 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Date: Wed, 15 Jan 2014 23:15:24 -0000 To: hdfs-commits@hadoop.apache.org From: brandonli@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140115231525.0EB9D23888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: brandonli Date: Wed Jan 15 23:15:24 2014 New Revision: 1558599 URL: http://svn.apache.org/r1558599 Log: HDFS-5775. Consolidate the code for serialization in CacheManager. Contributed by Haohui Mai Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1558599&r1=1558598&r2=1558599&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jan 15 23:15:24 2014 @@ -481,6 +481,9 @@ Trunk (Unreleased) HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager (Haohui Mai via brandonli) + HDFS-5775. Consolidate the code for serialization in CacheManager + (Haohui Mai via brandonli) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1558599&r1=1558598&r2=1558599&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Wed Jan 15 23:15:24 2014 @@ -160,6 +160,8 @@ public final class CacheManager { */ private final ReentrantLock crmLock = new ReentrantLock(); + private final SerializerCompat serializerCompat = new SerializerCompat(); + /** * The CacheReplicationMonitor. */ @@ -926,11 +928,9 @@ public final class CacheManager { * @param sdPath path of the storage directory * @throws IOException */ - public void saveState(DataOutputStream out, String sdPath) + public void saveStateCompat(DataOutputStream out, String sdPath) throws IOException { - out.writeLong(nextDirectiveId); - savePools(out, sdPath); - saveDirectives(out, sdPath); + serializerCompat.save(out, sdPath); } /** @@ -939,105 +939,117 @@ public final class CacheManager { * @param in DataInput from which to restore state * @throws IOException */ - public void loadState(DataInput in) throws IOException { - nextDirectiveId = in.readLong(); - // pools need to be loaded first since directives point to their parent pool - loadPools(in); - loadDirectives(in); - } - - /** - * Save cache pools to fsimage - */ - private void savePools(DataOutputStream out, - String sdPath) throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_POOLS, sdPath); - prog.beginStep(Phase.SAVING_CHECKPOINT, step); - prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); - Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); - out.writeInt(cachePools.size()); - for (CachePool pool: cachePools.values()) { - FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); - counter.increment(); - } - prog.endStep(Phase.SAVING_CHECKPOINT, step); - } - - /* - * Save cache entries to fsimage - */ - private void saveDirectives(DataOutputStream out, String sdPath) - throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_ENTRIES, sdPath); - prog.beginStep(Phase.SAVING_CHECKPOINT, step); - prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); - Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); - out.writeInt(directivesById.size()); - for (CacheDirective directive : directivesById.values()) { - FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); - counter.increment(); - } - prog.endStep(Phase.SAVING_CHECKPOINT, step); - } - - /** - * Load cache pools from fsimage - */ - private void loadPools(DataInput in) - throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_POOLS); - prog.beginStep(Phase.LOADING_FSIMAGE, step); - int numberOfPools = in.readInt(); - prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); - Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); - for (int i = 0; i < numberOfPools; i++) { - addCachePool(FSImageSerialization.readCachePoolInfo(in)); - counter.increment(); - } - prog.endStep(Phase.LOADING_FSIMAGE, step); + public void loadStateCompat(DataInput in) throws IOException { + serializerCompat.load(in); } - /** - * Load cache directives from the fsimage - */ - private void loadDirectives(DataInput in) throws IOException { - StartupProgress prog = NameNode.getStartupProgress(); - Step step = new Step(StepType.CACHE_ENTRIES); - prog.beginStep(Phase.LOADING_FSIMAGE, step); - int numDirectives = in.readInt(); - prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); - Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); - for (int i = 0; i < numDirectives; i++) { - CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); - // Get pool reference by looking it up in the map - final String poolName = info.getPool(); - CachePool pool = cachePools.get(poolName); - if (pool == null) { - throw new IOException("Directive refers to pool " + poolName + - ", which does not exist."); - } - CacheDirective directive = - new CacheDirective(info.getId(), info.getPath().toUri().getPath(), - info.getReplication(), info.getExpiration().getAbsoluteMillis()); - boolean addedDirective = pool.getDirectiveList().add(directive); - assert addedDirective; - if (directivesById.put(directive.getId(), directive) != null) { - throw new IOException("A directive with ID " + directive.getId() + - " already exists"); - } - List directives = - directivesByPath.get(directive.getPath()); - if (directives == null) { - directives = new LinkedList(); - directivesByPath.put(directive.getPath(), directives); + private final class SerializerCompat { + private void save(DataOutputStream out, String sdPath) throws IOException { + out.writeLong(nextDirectiveId); + savePools(out, sdPath); + saveDirectives(out, sdPath); + } + + private void load(DataInput in) throws IOException { + nextDirectiveId = in.readLong(); + // pools need to be loaded first since directives point to their parent pool + loadPools(in); + loadDirectives(in); + } + + /** + * Save cache pools to fsimage + */ + private void savePools(DataOutputStream out, + String sdPath) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_POOLS, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(cachePools.size()); + for (CachePool pool: cachePools.values()) { + FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); + counter.increment(); + } + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /* + * Save cache entries to fsimage + */ + private void saveDirectives(DataOutputStream out, String sdPath) + throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_ENTRIES, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); + out.writeInt(directivesById.size()); + for (CacheDirective directive : directivesById.values()) { + FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); + counter.increment(); + } + prog.endStep(Phase.SAVING_CHECKPOINT, step); + } + + /** + * Load cache pools from fsimage + */ + private void loadPools(DataInput in) + throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_POOLS); + prog.beginStep(Phase.LOADING_FSIMAGE, step); + int numberOfPools = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); + for (int i = 0; i < numberOfPools; i++) { + addCachePool(FSImageSerialization.readCachePoolInfo(in)); + counter.increment(); + } + prog.endStep(Phase.LOADING_FSIMAGE, step); + } + + /** + * Load cache directives from the fsimage + */ + private void loadDirectives(DataInput in) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.CACHE_ENTRIES); + prog.beginStep(Phase.LOADING_FSIMAGE, step); + int numDirectives = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); + for (int i = 0; i < numDirectives; i++) { + CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); + // Get pool reference by looking it up in the map + final String poolName = info.getPool(); + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new IOException("Directive refers to pool " + poolName + + ", which does not exist."); + } + CacheDirective directive = + new CacheDirective(info.getId(), info.getPath().toUri().getPath(), + info.getReplication(), info.getExpiration().getAbsoluteMillis()); + boolean addedDirective = pool.getDirectiveList().add(directive); + assert addedDirective; + if (directivesById.put(directive.getId(), directive) != null) { + throw new IOException("A directive with ID " + directive.getId() + + " already exists"); + } + List directives = + directivesByPath.get(directive.getPath()); + if (directives == null) { + directives = new LinkedList(); + directivesByPath.put(directive.getPath(), directives); + } + directives.add(directive); + counter.increment(); } - directives.add(directive); - counter.increment(); + prog.endStep(Phase.LOADING_FSIMAGE, step); } - prog.endStep(Phase.LOADING_FSIMAGE, step); } public void waitForRescanIfNeeded() { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1558599&r1=1558598&r2=1558599&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Jan 15 23:15:24 2014 @@ -878,7 +878,7 @@ public class FSImageFormat { if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) { return; } - namesystem.getCacheManager().loadState(in); + namesystem.getCacheManager().loadStateCompat(in); } private int getLayoutVersion() { @@ -1034,7 +1034,7 @@ public class FSImageFormat { context.checkCancelled(); sourceNamesystem.saveSecretManagerStateCompat(out, sdPath); context.checkCancelled(); - sourceNamesystem.getCacheManager().saveState(out, sdPath); + sourceNamesystem.getCacheManager().saveStateCompat(out, sdPath); context.checkCancelled(); out.flush(); context.checkCancelled();