Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 61101 invoked from network); 19 Jun 2008 06:42:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Jun 2008 06:42:12 -0000 Received: (qmail 133 invoked by uid 500); 19 Jun 2008 06:42:14 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 112 invoked by uid 500); 19 Jun 2008 06:42:14 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 103 invoked by uid 99); 19 Jun 2008 06:42:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jun 2008 23:42:14 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jun 2008 06:41:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CCD2823889BB; Wed, 18 Jun 2008 23:41:50 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r669400 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/dfs/ Date: Thu, 19 Jun 2008 06:41:50 -0000 To: core-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080619064150.CCD2823889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dhruba Date: Wed Jun 18 23:41:50 2008 New Revision: 669400 URL: http://svn.apache.org/viewvc?rev=669400&view=rev Log: HADOOP-2703. Refactor the distributed upgrade code so that it is easier to identify datanode and namenode related code. (dhruba) Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampStatsUpgradeCommand.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeDatanode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeNamenode.java Removed: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgrade.java Modified: hadoop/core/trunk/CHANGES.txt Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=669400&r1=669399&r2=669400&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Wed Jun 18 23:41:50 2008 @@ -12,6 +12,9 @@ BUG FIXES + HADOOP-2703. Refactor the distributed upgrade code so that it is + easier to identify datanode and namenode related code. (dhruba) + Release 0.18.0 - Unreleased INCOMPATIBLE CHANGES Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampStatsUpgradeCommand.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampStatsUpgradeCommand.java?rev=669400&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampStatsUpgradeCommand.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampStatsUpgradeCommand.java Wed Jun 18 23:41:50 2008 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.*; + +/** + * The Datanode sends this statistics object to the Namenode periodically + * during a Generation Stamp Upgrade. + */ +class GenerationStampStatsUpgradeCommand extends UpgradeCommand { + DatanodeID datanodeId; + int blocksUpgraded; + int blocksRemaining; + int errors; + + GenerationStampStatsUpgradeCommand() { + super(GenerationStampUpgradeNamenode.DN_CMD_STATS, 0, (short)0); + datanodeId = new DatanodeID(); + } + + public GenerationStampStatsUpgradeCommand(short status, DatanodeID dn, + int blocksUpgraded, int blocksRemaining, + int errors, int version) { + super(GenerationStampUpgradeNamenode.DN_CMD_STATS, version, status); + //copy so that only ID part gets serialized + datanodeId = new DatanodeID(dn); + this.blocksUpgraded = blocksUpgraded; + this.blocksRemaining = blocksRemaining; + this.errors = errors; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + datanodeId.readFields(in); + blocksUpgraded = in.readInt(); + blocksRemaining = in.readInt(); + errors = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + datanodeId.write(out); + out.writeInt(blocksUpgraded); + out.writeInt(blocksRemaining); + out.writeInt(errors); + } +} + Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeDatanode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeDatanode.java?rev=669400&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeDatanode.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeDatanode.java Wed Jun 18 23:41:50 2008 @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.*; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; + +import org.apache.commons.logging.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.*; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +/** + * This class associates a block generation stamp with with block. This + * generation stamp is written to each metadata file. Please see + * HADOOP-1700 for details. + */ +class GenerationStampUpgradeDatanode extends UpgradeObjectDatanode { + + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgrade"); + + DatanodeProtocol namenode; + InetSocketAddress namenodeAddr; + + // stats + private AtomicInteger blocksPreviouslyUpgraded = new AtomicInteger(0); + private AtomicInteger blocksToUpgrade = new AtomicInteger(0); + private AtomicInteger blocksUpgraded = new AtomicInteger(0); + private AtomicInteger errors = new AtomicInteger(0); + + // process the upgrade using a pool of threads. + static private final int poolSize = 4; + + // If no progress has occured during this time, print warnings message. + static private final int LONG_TIMEOUT_MILLISEC = 1*60*1000; // 1 minute + + // This object is needed to indicate that namenode is not running upgrade. + static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand(); + + private List completedList = new LinkedList(); + + /* This is set when the datanode misses the regular upgrade. + * When this is set, it upgrades the block but stops heartbeating + * to the namenode. + */ + private AtomicBoolean offlineUpgrade = new AtomicBoolean(false); + private AtomicBoolean upgradeCompleted = new AtomicBoolean(false); + + // Implement the common interfaces required by UpgradeObjectDatanode + + public int getVersion() { + return GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION; + } + + /* + * Start upgrade if it not already running. It sends status to + * namenode even if an upgrade is already in progress. + */ + public synchronized UpgradeCommand startUpgrade() throws IOException { + if (offlineUpgrade.get()) { + doUpgrade(); + } + return null; + } + + public String getDescription() { + return "Block Generation Stamp Upgrade at Datanode"; + } + + public short getUpgradeStatus() { + return (blocksToUpgrade.get() == blocksUpgraded.get()) ? 100 : + (short) Math.floor(blocksUpgraded.get()*100.0/blocksToUpgrade.get()); + } + + public UpgradeCommand completeUpgrade() throws IOException { + // return latest stats command. + assert getUpgradeStatus() == 100; + return new GenerationStampStatsUpgradeCommand(getUpgradeStatus(), + getDatanode().dnRegistration, + blocksPreviouslyUpgraded.get() + blocksUpgraded.get(), + blocksToUpgrade.get()-blocksUpgraded.get(), + errors.get(), + GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION); + } + + @Override + boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException { + int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion(); + if(nsUpgradeVersion >= getVersion()) { + return false; // Normal upgrade. + } + + LOG.info("\n This Datanode has missed a cluster wide Block generation Stamp Upgrade." + + "\n Will perform an 'offline' upgrade of the blocks." + + "\n During this time, Datanode does not heartbeat."); + + + // Namenode removes this node from the registered nodes + try { + getDatanode().namenode.errorReport(getDatanode().dnRegistration, + DatanodeProtocol.NOTIFY, + "Performing an offline generation stamp " + + "upgrade. " + + "Will be back online once the ugprade " + + "completes. Please see datanode logs."); + + } catch(IOException ignored) { + LOG.info("\n This Datanode was unable to send error report to namenode."); + } + offlineUpgrade.set(true); + return true; + } + + public GenerationStampUpgradeDatanode() { + blocksPreviouslyUpgraded.set(0); + blocksToUpgrade.set(0); + blocksUpgraded.set(0); + errors.set(0); + } + + static File getPreGenerationMetaFile(File f) { + return new File(f.getAbsolutePath() + FSDataset.METADATA_EXTENSION); + } + + // This class is invoked by the worker thread to convert the + // metafile into the new format + // + class UpgradeExecutor implements Runnable { + Block block; + Throwable throwable; + + UpgradeExecutor(Block b) { + block = b; + } + + public void run() { + try { + // do the real work here + FSDataset dataset = (FSDataset) getDatanode().data; + upgradeToCurVersion(dataset, block); + } catch (Throwable t) { + throwable = t; + } + synchronized (completedList) { + completedList.add(this); + completedList.notify(); + } + } + + /** + * Upgrades the metadata file to current version if required. + * @param dataset + * @param block + */ + void upgradeToCurVersion(FSDataset dataset, Block block) + throws IOException { + File blockFile = dataset.getBlockFile(block); + if (blockFile == null) { + throw new IOException("Could find file for " + block); + } + + File metadataFile = dataset.getMetaFile(block); + File oldmetadataFile = getPreGenerationMetaFile(blockFile); + + if (metadataFile.exists() && oldmetadataFile.exists()) { + // + // If both file exists and are of the same size, + // then delete the old one. If the sizes are not same then + // leave both of them and consider the upgrade as successful. + // + if (metadataFile.length() == oldmetadataFile.length()) { + if (!oldmetadataFile.delete()) { + LOG.info("Unable to delete old metadata file " + oldmetadataFile); + } + } + } else if (metadataFile.exists()) { + // + // Only the new file exists, nothing more to do. + // + return; + } else if (oldmetadataFile.exists()) { + // + // The old file exists but the new one is missing. Rename + // old one to new name. + // + if (!oldmetadataFile.renameTo(metadataFile)) { + throw new IOException("Could find rename " + oldmetadataFile + + " to " + metadataFile); + } + } else { + throw new IOException("Could find any metadata file for " + block); + } + } + } + + // This method iterates through all the blocks on a datanode and + // do the upgrade. + // + void doUpgrade() throws IOException { + + if (upgradeCompleted.get()) { + assert offlineUpgrade.get() : + ("Multiple calls to doUpgrade is expected only during " + + "offline upgrade"); + return; + } + + FSDataset dataset = (FSDataset) getDatanode().data; + + // Set up the retry policy so that each attempt waits for one minute. + Configuration conf = new Configuration(); + // set rpc timeout to one minute. + conf.set("ipc.client.timeout", "60000"); + + RetryPolicy timeoutPolicy = + RetryPolicies.retryUpToMaximumCountWithFixedSleep( + LONG_TIMEOUT_MILLISEC/1000, + 1, TimeUnit.MILLISECONDS); + + Map,RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy); + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = + new HashMap(); + // do we need to set the policy for connection failures also? + methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy); + + LOG.info("Starting Block Generation Stamp Upgrade on datanode " + + getDatanode()); + + for (;;) { + try { + namenodeAddr = getDatanode().getNameNodeAddr(); + namenode = (DatanodeProtocol) RetryProxy.create( + DatanodeProtocol.class, + RPC.waitForProxy(DatanodeProtocol.class, + DatanodeProtocol.versionID, + namenodeAddr, + conf), + methodNameToPolicyMap); + break; + } catch (IOException e) { + LOG.warn("Generation Stamp Upgrade Exception " + + "while trying to connect to NameNode at " + + getDatanode().getNameNodeAddr().toString() + " : " + + StringUtils.stringifyException(e)); + try { + Thread.sleep(10*1000); + } catch (InterruptedException e1) { + throw new IOException("Interrupted Sleep while creating RPC proxy." + + e1); + } + } + } + LOG.info("Block Generation Stamp Upgrade Datanode connected to " + + "namenode at " + namenodeAddr); + + // Get a list of all the blocks : + LinkedList blockList = new LinkedList(); + + //Fill blockList with blocks to be upgraded. + Block [] blockArr = dataset.getBlockReport(); + + for (Block b : blockArr) { + File blockFile = null; + try { + blockFile = dataset.getBlockFile(b); + } catch (IOException e) { + //The block might just be deleted. ignore it. + LOG.warn("Could not find file location for " + b + + ". It might already be deleted. Exception : " + + StringUtils.stringifyException(e)); + errors.getAndIncrement(); + continue; + } + if (!blockFile.exists()) { + errors.getAndIncrement(); + LOG.error("could not find block file " + blockFile); + continue; + } + File metaFile = dataset.getMetaFile(b); + File oldMetaFile = getPreGenerationMetaFile(blockFile); + if (metaFile.exists()) { + blocksPreviouslyUpgraded.getAndIncrement(); + continue; + } + blocksToUpgrade.getAndIncrement(); + blockList.add(new UpgradeExecutor(b)); + } + blockArr = null; + int nLeft = blockList.size(); + + LOG.info("Starting upgrade of " + blocksToUpgrade.get() + " blocks out of " + + (blocksToUpgrade.get() + blocksPreviouslyUpgraded.get())); + + // Start the pool of upgrade workers + ExecutorService pool = Executors.newFixedThreadPool(poolSize); + for (Iterator it = blockList.iterator(); it.hasNext();) { + pool.submit(it.next()); + } + + // Inform the namenode + sendStatus(); + + // Report status to namenode every so many seconds: + long now = System.currentTimeMillis(); + long statusReportIntervalMilliSec = 30*1000; + long lastStatusReportTime = now; + long lastUpdateTime = now; + long lastWarnTime = now; + + // Now wait for the tasks to complete. + // + while (nLeft > 0) { + synchronized (completedList) { + if (completedList.size() <= 0) { + try { + completedList.wait(1000); + } catch (InterruptedException ignored) {} + } + + now = System.currentTimeMillis(); + + if (completedList.size()> 0) { + UpgradeExecutor exe = completedList.remove(0); + nLeft--; + if (exe.throwable != null) { + errors.getAndIncrement(); + LOG.error("Got an exception during generation stamp upgrade of " + + exe.block + ": " + + StringUtils.stringifyException(exe.throwable)); + } + blocksUpgraded.getAndIncrement(); + lastUpdateTime = now; + } else { + if ((now - lastUpdateTime) >= LONG_TIMEOUT_MILLISEC && + (now - lastWarnTime) >= LONG_TIMEOUT_MILLISEC) { + lastWarnTime = now; + LOG.warn("No block was updated in last " + + (LONG_TIMEOUT_MILLISEC/(60*1000)) + + " minutes! will keep waiting... "); + } + } + } + + if ((now-lastStatusReportTime) > statusReportIntervalMilliSec) { + sendStatus(); + lastStatusReportTime = System.currentTimeMillis(); + } + } + + pool.shutdown(); + upgradeCompleted.set(true); + + LOG.info("Completed Block Generation Stamp Upgrade. Total of " + + (blocksPreviouslyUpgraded.get() + blocksToUpgrade.get()) + + " blocks : " + blocksPreviouslyUpgraded.get() + " blocks previously " + + "upgraded, " + blocksUpgraded.get() + " blocks upgraded this time " + + "with " + errors.get() + " errors."); + + // now inform the name node about the completion. + // What if there is no upgrade running on Namenode now? + while (!sendStatus()); + + } + + /** Sends current status and stats to namenode and logs it to local log*/ + boolean sendStatus() { + LOG.info((offlineUpgrade.get() ? "Offline " : "") + + "Block Generation Stamp Upgrade : " + + getUpgradeStatus() + "% completed."); + if (offlineUpgrade.get()) { + return true; + } + + GenerationStampStatsUpgradeCommand cmd = null; + synchronized (this) { + cmd = new GenerationStampStatsUpgradeCommand(getUpgradeStatus(), + getDatanode().dnRegistration, + blocksPreviouslyUpgraded.get() + blocksUpgraded.get(), + blocksToUpgrade.get()-blocksUpgraded.get(), + errors.get(), + GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION); + } + UpgradeCommand reply = sendCommand(namenodeAddr, namenode, cmd, 0); + if (reply == null) { + LOG.warn("Could not send status to Namenode. Namenode might be " + + "over loaded or down."); + } + return reply != null; + } + + + // Sends a command to the namenode + static UpgradeCommand sendCommand(InetSocketAddress namenodeAddr, + DatanodeProtocol namenode, + UpgradeCommand cmd, int retries) { + for(int i=0; i<=retries || retries<0; i++) { + try { + UpgradeCommand reply = namenode.processUpgradeCommand(cmd); + if (reply == null) { + /* namenode might not be running upgrade or finished + * an upgrade. We just return a static object */ + return noUpgradeOnNamenode; + } + return reply; + } catch (IOException e) { + // print the stack trace only for the last retry. + LOG.warn("Exception to " + namenodeAddr + + " while sending command " + + cmd.getAction() + ": " + e + + ((retries<0 || i>=retries)? "... will retry ..." : + ": " + StringUtils.stringifyException(e))); + } + } + return null; + } +} Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeNamenode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeNamenode.java?rev=669400&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeNamenode.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/GenerationStampUpgradeNamenode.java Wed Jun 18 23:41:50 2008 @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.*; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.*; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +/** + * This class associates a block generation stamp with with block. This + * generation stamp is written to each metadata file. Please see + * HADOOP-1700 for details. + */ +/** + * Once an upgrade starts at the namenode , this class manages the upgrade + * process. + */ +class GenerationStampUpgradeNamenode extends UpgradeObjectNamenode { + + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgradeNamenode"); + + static final long inactivityExtension = 10*1000; // 10 seconds + AtomicLong lastNodeCompletionTime = new AtomicLong(0); + + // The layout version before the generation stamp upgrade. + static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13; + + static final int DN_CMD_STATS = 300; + + enum UpgradeStatus { + INITIALIZED, + STARTED, + DATANODES_DONE, + COMPLETED, + } + + UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED; + + class DnInfo { + short percentCompleted = 0; + long blocksUpgraded = 0; + long blocksRemaining = 0; + long errors = 0; + + DnInfo(short pcCompleted) { + percentCompleted = status; + } + DnInfo() {} + + void setStats(GenerationStampStatsUpgradeCommand cmd) { + percentCompleted = cmd.getCurrentStatus(); + blocksUpgraded = cmd.blocksUpgraded; + blocksRemaining = cmd.blocksRemaining; + errors = cmd.errors; + } + + boolean isDone() { + return percentCompleted >= 100; + } + } + + /* We should track only the storageIDs and not DatanodeID, which + * includes datanode name and storage id. + */ + HashMap dnMap = new HashMap(); + HashMap unfinishedDnMap = + new HashMap(); + + Daemon monitorThread; + double avgDatanodeCompletionPct = 0; + boolean forceDnCompletion = false; + + //Upgrade object interface: + + public int getVersion() { + return PRE_GENERATIONSTAMP_LAYOUT_VERSION; + } + + public UpgradeCommand completeUpgrade() throws IOException { + return null; + } + + @Override + public String getDescription() { + return "Block Generation Stamp Upgrade at Namenode"; + } + + @Override + public synchronized short getUpgradeStatus() { + // Reserve 10% for deleting files. + if (upgradeStatus == UpgradeStatus.COMPLETED) { + return 100; + } + return (short) avgDatanodeCompletionPct; + } + + @Override + public UpgradeCommand startUpgrade() throws IOException { + + assert monitorThread == null; + lastNodeCompletionTime.set(System.currentTimeMillis()); + + monitorThread = new Daemon(new UpgradeMonitor()); + monitorThread.start(); + return super.startUpgrade(); + } + + @Override + public synchronized void forceProceed() throws IOException { + if (forceDnCompletion) { + LOG.warn("forceProceed is already set for this upgrade. It can take " + + "a short while to take affect. Please wait."); + return; + } + + LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " + + "will be considered done. It can take a few seconds to take " + + "effect."); + forceDnCompletion = true; + } + + @Override + UpgradeCommand processUpgradeCommand(UpgradeCommand command) + throws IOException { + switch (command.getAction()) { + + case GenerationStampUpgradeNamenode.DN_CMD_STATS : + return handleStatsCmd(command); + + default: + throw new IOException("Unknown Command for Generation Stamp Upgrade : " + + command.getAction()); + } + } + + @Override + public UpgradeStatusReport getUpgradeStatusReport(boolean details) + throws IOException { + + /* If 'details' is true should we update block level status? + * It could take multiple minutes + * updateBlckLevelStats()? + */ + + String replyString = ""; + + short status = 0; + + synchronized (this) { + + status = getUpgradeStatus(); + + replyString = String.format( + ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+ + ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") + + "\tLast Block Level Stats updated at : %tc\n" + + "\tLast Block Level Stats : %s\n" + + "\tBrief Datanode Status : %s\n" + + "%s", + latestBlockLevelStats.updatedAt, + latestBlockLevelStats.statusString("\n\t "), + printStatus("\n\t "), + ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ? + "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " + + "files\n\tcan take longer than status implies.\n" : "") + ); + + if (details) { + // list all the known data nodes + StringBuilder str = null; + Iterator keys = dnMap.keySet().iterator(); + Iterator values = dnMap.values().iterator(); + + for(; keys.hasNext() && values.hasNext() ;) { + DatanodeID dn = keys.next(); + DnInfo info = values.next(); + String dnStr = "\t\t" + dn.getName() + "\t : " + + info.percentCompleted + " % \t" + + info.blocksUpgraded + " u \t" + + info.blocksRemaining + " r \t" + + info.errors + " e\n"; + if ( str == null ) { + str = new StringBuilder(dnStr.length()* + (dnMap.size() + (dnMap.size()+7)/8)); + } + str.append(dnStr); + } + + replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " + + "pct Completion(%) blocks upgraded (u) " + + "blocks remaining (r) errors (e)\n\n" + + (( str == null ) ? + "\t\tThere are no known Datanodes\n" : str); + } + } + return new GenerationStampUpgradeStatusReport( + PRE_GENERATIONSTAMP_LAYOUT_VERSION, + status, replyString); + } + + + /** + * The namenode process a periodic statistics message from the datanode. + */ + private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) { + + GenerationStampStatsUpgradeCommand stats = (GenerationStampStatsUpgradeCommand)cmd; + + DatanodeID dn = stats.datanodeId; + DnInfo dnInfo = dnMap.get(dn); + boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone()); + + if (dnInfo == null) { + dnInfo = new DnInfo(); + dnMap.put(dn, dnInfo); + LOG.info("Upgrade started/resumed at datanode " + dn.getName()); + } + + dnInfo.setStats(stats); + if (!dnInfo.isDone()) { + unfinishedDnMap.put(dn, dnInfo); + } + + if (dnInfo.isDone() && !alreadyCompleted) { + LOG.info("upgrade completed on datanode " + dn.getName()); + unfinishedDnMap.remove(dn); + if (unfinishedDnMap.size() == 0) { + lastNodeCompletionTime.set(System.currentTimeMillis()); + } + } + + //Should we send any more info? + return new UpgradeCommand(); + } + + public GenerationStampUpgradeNamenode() { + } + + // For now we will wait for all the nodes to complete upgrade. + synchronized boolean isUpgradeDone() { + return upgradeStatus == UpgradeStatus.COMPLETED; + } + + synchronized String printStatus(String spacing) { + //NOTE: iterates on all the datanodes. + + // Calculate % completion on all the data nodes. + long errors = 0; + long totalCompletion = 0; + for( Iterator it = dnMap.values().iterator(); it.hasNext(); ) { + DnInfo dnInfo = it.next(); + totalCompletion += dnInfo.percentCompleted; + errors += dnInfo.errors; + } + + avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20); + + String msg = "Avg completion of all Datanodes: " + + String.format("%.2f%%", avgDatanodeCompletionPct) + + " with " + errors + " errors. " + + ((unfinishedDnMap.size() > 0) ? spacing + + unfinishedDnMap.size() + " out of " + dnMap.size() + + " nodes are not done." : ""); + + LOG.info("Generation Stamp Upgrade is " + (isUpgradeDone() ? + "complete. " : "still running. ") + spacing + msg); + return msg; + } + + private synchronized void setStatus(UpgradeStatus status) { + upgradeStatus = status; + } + + /* Checks if upgrade completed based on datanode's status and/or + * if all the blocks are upgraded. + */ + private synchronized UpgradeStatus checkOverallCompletion() { + + if (upgradeStatus == UpgradeStatus.COMPLETED || + upgradeStatus == UpgradeStatus.DATANODES_DONE) { + return upgradeStatus; + } + + if (upgradeStatus != UpgradeStatus.DATANODES_DONE) { + boolean datanodesDone = + (dnMap.size() > 0 && unfinishedDnMap.size() == 0 && + ( System.currentTimeMillis() - lastNodeCompletionTime.get() ) > + inactivityExtension) || forceDnCompletion ; + + if ( datanodesDone ) { + LOG.info("Upgrade of DataNode blocks is complete. " + + ((forceDnCompletion) ? "(ForceDnCompletion is on.)" : "")); + upgradeStatus = UpgradeStatus.DATANODES_DONE; + } + } + + if (upgradeStatus != UpgradeStatus.DATANODES_DONE && + latestBlockLevelStats.updatedAt > 0) { + // check if last block report marked all + if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 && + latestBlockLevelStats.underReplicatedBlocks == 0) { + + LOG.info("Marking datanode upgrade complete since all the blocks are " + + "upgraded (even though some datanodes may not have " + + "reported completion. Block level stats :\n\t" + + latestBlockLevelStats.statusString("\n\t")); + upgradeStatus = UpgradeStatus.DATANODES_DONE; + } + } + + return upgradeStatus; + } + + /** + * This class monitors the upgrade progress and periodically prints + * status message to log. + */ + class UpgradeMonitor implements Runnable { + + static final long statusReportIntervalMillis = 1*60*1000; + static final long blockReportIntervalMillis = 5*60*1000; + static final int sleepTimeSec = 5; + + public void run() { + long lastReportTime = System.currentTimeMillis(); + long lastBlockReportTime = lastReportTime; + + while ( !isUpgradeDone() ) { + UpgradeStatus status = checkOverallCompletion(); + + if (status == UpgradeStatus.DATANODES_DONE) { + setStatus(UpgradeStatus.COMPLETED); + } + + long now = System.currentTimeMillis(); + + + if (now-lastBlockReportTime >= blockReportIntervalMillis) { + updateBlockLevelStats(); + // Check if all the blocks have been upgraded. + lastBlockReportTime = now; + } + + if ((now - lastReportTime) >= statusReportIntervalMillis || + isUpgradeDone()) { + printStatus("\n\t"); + lastReportTime = now; + } + + if (isUpgradeDone()) { + break; + } + + try { + Thread.sleep(sleepTimeSec*1000); + } catch (InterruptedException e) { + break; + } + } + LOG.info("Leaving the Generation Stamp Upgrade Namenode monitor thread"); + } + } + + private BlockLevelStats latestBlockLevelStats = new BlockLevelStats(); + // internal class to hold the stats. + private static class BlockLevelStats { + long fullyReplicatedBlocks = 0; + long minimallyReplicatedBlocks = 0; + long underReplicatedBlocks = 0; // includes unReplicatedBlocks + long unReplicatedBlocks = 0; // zero replicas upgraded + long errors; + long updatedAt; + + String statusString(String spacing) { + long totalBlocks = fullyReplicatedBlocks + + minimallyReplicatedBlocks + + underReplicatedBlocks; + double multiplier = 100/(totalBlocks + 1e-20); + + if (spacing.equals("")) { + spacing = ", "; + } + + return String.format( + "Total Blocks : %d" + + "%sFully Upgragraded : %.2f%%" + + "%sMinimally Upgraded : %.2f%%" + + "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" + + "%sUn-upgraded : %.2f%%" + + "%sErrors : %d", totalBlocks, + spacing, (fullyReplicatedBlocks * multiplier), + spacing, (minimallyReplicatedBlocks * multiplier), + spacing, (underReplicatedBlocks * multiplier), + spacing, (unReplicatedBlocks * multiplier), + spacing, errors); + } + } + + void updateBlockLevelStats(String path, BlockLevelStats stats) { + DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path); + + for (DFSFileInfo file:fileArr) { + if (file.isDir()) { + updateBlockLevelStats(file.getPath().toString(), stats); + } else { + // Get the all the blocks. + LocatedBlocks blockLoc = null; + try { + blockLoc = getFSNamesystem().getBlockLocations( + file.getPath().toString(), 0, file.getLen()); + int numBlocks = blockLoc.locatedBlockCount(); + for (int i=0; i= file.getReplication()) { + stats.fullyReplicatedBlocks++; + } else if (numUpgraded >= getFSNamesystem().getMinReplication()) { + stats.minimallyReplicatedBlocks++; + } else { + stats.underReplicatedBlocks++; + } + if (numUpgraded == 0) { + stats.unReplicatedBlocks++; + } + } + } catch (IOException e) { + LOG.error("BlockGenerationStampUpgrade: could not get block locations for " + + file.getPath().toString() + " : " + + StringUtils.stringifyException(e)); + stats.errors++; + } + } + } + } + + void updateBlockLevelStats() { + /* This iterates over all the blocks and updates various + * counts. + * Since iterating over all the blocks at once would be quite + * large operation under lock, we iterate over all the files + * and update the counts for blocks that belong to a file. + */ + + LOG.info("Starting update of block level stats. " + + "This could take a few minutes"); + BlockLevelStats stats = new BlockLevelStats(); + updateBlockLevelStats("/", stats); + stats.updatedAt = System.currentTimeMillis(); + + LOG.info("Block level stats:\n\t" + stats.statusString("\n\t")); + synchronized (this) { + latestBlockLevelStats = stats; + } + } +} + +/** + * A status report object for Generation Stamp Upgrades + */ +class GenerationStampUpgradeStatusReport extends UpgradeStatusReport { + + String extraText = ""; + + public GenerationStampUpgradeStatusReport() { + } + + public GenerationStampUpgradeStatusReport(int version, short status, + String extraText) { + super(version, status, false); + this.extraText = extraText; + } + + @Override + public String getStatusText(boolean details) { + return super.getStatusText(details) + "\n\n" + extraText; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + extraText = Text.readString(in); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, extraText); + } +} +