Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 95598 invoked from network); 4 Mar 2011 04:52:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:52:20 -0000 Received: (qmail 44672 invoked by uid 500); 4 Mar 2011 04:52:19 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 44641 invoked by uid 500); 4 Mar 2011 04:52:19 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 44630 invoked by uid 99); 4 Mar 2011 04:52:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:52:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:52:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E462F2388B42; Fri, 4 Mar 2011 04:51:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077754 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Date: Fri, 04 Mar 2011 04:51:51 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304045151.E462F2388B42@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 04:51:51 2011 New Revision: 1077754 URL: http://svn.apache.org/viewvc?rev=1077754&view=rev Log: commit f98ac7cfb2224293e5bfe2bde470c8dafe7c13ec Author: Alfred Thompson Date: Thu Sep 30 22:00:30 2010 +0000 This revision of the TestBalancer test case fixes issues with JMX integration. In this rev, the basic balancer scenario is complete in its implementation. This version provides a foundation for the development of further balancer test scenarios. Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java?rev=1077754&r1=1077753&r2=1077754&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Fri Mar 4 04:51:51 2011 @@ -28,16 +28,15 @@ import java.io.OutputStream; import java.io.PrintStream; import java.net.URI; import java.security.SecureRandom; -import java.util.HashMap; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import javax.management.InstanceNotFoundException; -import javax.management.MBeanException; import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import javax.management.ReflectionException; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -56,18 +55,12 @@ import org.apache.hadoop.hdfs.test.syste import org.apache.hadoop.mapreduce.test.system.MRCluster; -import org.apache.hadoop.examples.RandomWriter; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.test.system.JTProtocol; -import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.JTClient; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.test.system.AbstractDaemonClient; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Assert; @@ -82,20 +75,6 @@ public class TestBalancer { private Configuration hadoopConf; private HDFSCluster dfsCluster; private MRCluster mrCluster; - // TODO don't hardwire these, introspect the cluster - private static final String NAMENODE = "gsbl90277.blue.ygrid.yahoo.com"; - private static final String[] ENDPOINT_JMX = { - "gsbl90277.blue.ygrid.yahoo.com-8008", - "gsbl90276.blue.ygrid.yahoo.com-24812", - "gsbl90275.blue.ygrid.yahoo.com-24810", - "gsbl90274.blue.ygrid.yahoo.com-24808", - "gsbl90273.blue.ygrid.yahoo.com-24806", - "gsbl90272.blue.ygrid.yahoo.com-24804", - "gsbl90271.blue.ygrid.yahoo.com-24802", - "gsbl90270.blue.ygrid.yahoo.com-24800" - }; - private Map endpointMap = - new HashMap(); public TestBalancer() throws Exception { } @@ -108,7 +87,7 @@ public class TestBalancer { //TODO no need for mr cluster anymore mrCluster = MRCluster.createCluster(hadoopConf); mrCluster.setUp(); - connectJMX(); + //connectJMX(); } @After @@ -117,148 +96,7 @@ public class TestBalancer { mrCluster.tearDown(); } - /** Connect to JMX agents on HDFS cluster nodes */ - private void connectJMX() { - final int HOST = 0; - final int PORT = 1; - for (String endpoint : ENDPOINT_JMX) { - String[] toks = endpoint.split("-"); - String host = toks[HOST]; - String port = toks[PORT]; - LOG.info("HOST=" + host + ", PORT=" + port); - MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port); - endpointMap.put(host, jmxEndpoint); - } - } - - private long getDataNodeFreeSpace(DNClient datanode) { - String dnHost = datanode.getHostName(); - Object volObj = getDNAttribute(dnHost, "VolumeInfo"); - Map volInfoMap = (Map) JSON.parse(volObj.toString()); - long totalFreeSpace = 0L; - for (Object key : volInfoMap.keySet()) { - Map attrMap = (Map) volInfoMap.get(key); - long freeSpace = (Long) attrMap.get("freeSpace"); - //LOG.info( String.format("volume %s has %d bytes free space left", key, freeSpace) ); - totalFreeSpace += freeSpace; - } - //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj)); - return totalFreeSpace; - } - - private long getDataNodeUsedSpace(DNClient datanode) { - String dnHost = datanode.getHostName(); - LOG.debug("checking DFS space used on host "+dnHost); - Object volObj = getDNAttribute(dnHost, "VolumeInfo"); - LOG.debug("retrieved volume info object "+volObj); - Map volInfoMap = (Map) JSON.parse(volObj.toString()); - long totalUsedSpace = 0L; - for (Object key : volInfoMap.keySet()) { - Map attrMap = (Map) volInfoMap.get(key); - // TODO should we be using free space here? - long usedSpace = (Long) attrMap.get("usedSpace"); - LOG.info( String.format("volume %s has %d bytes used space", key, usedSpace) ); - totalUsedSpace += usedSpace; - } - //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj)); - return totalUsedSpace; - } - - // TODO just throw the dang exceptions - private Object getDNAttribute(String host, String attribName) { - ObjectName name = null; - Object attribVal = null; - try { - MBeanServerConnection conn = endpointMap.get(host); - name = new ObjectName("HadoopInfo:type=DataNodeInfo"); - attribVal = conn.getAttribute(name, attribName); - } catch (javax.management.AttributeNotFoundException attribNotFoundExc) { - LOG.warn(String.format("no attribute matching %s found", attribName), - attribNotFoundExc); - } catch (javax.management.MalformedObjectNameException badObjNameExc) { - LOG.warn("bad object name: " + name, badObjNameExc); - } catch (javax.management.InstanceNotFoundException instNotFoundExc) { - LOG.warn("no MBean instance found", instNotFoundExc); - } catch (javax.management.ReflectionException reflectExc) { - LOG.warn("reflection error!", reflectExc); - } catch (javax.management.MBeanException mBeanExc) { - LOG.warn("MBean error!", mBeanExc); - } catch (java.io.IOException ioExc) { - LOG.debug("i/o error!", ioExc); - } - return attribVal; - } - //@Test - - public void testJMXRemote() { - final int HOST = 0; - final int PORT = 1; - for (String endpoint : ENDPOINT_JMX) { - String[] toks = endpoint.split("-"); - String host = toks[HOST]; - String port = toks[PORT]; - //LOG.info("HOST="+host+", PORT="+port); - MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port); - endpointMap.put(host, jmxEndpoint); - } - - - Iterator iter = endpointMap.keySet().iterator(); - while (iter.hasNext()) { - String host = iter.next(); - MBeanServerConnection conn = endpointMap.get(host); - ObjectName mBeanName = null; - try { - if (NAMENODE.equals(host)) { - // TODO make this a constant - mBeanName = new ObjectName("HadoopInfo:type=NameNodeInfo"); - } else { - mBeanName = new ObjectName("HadoopInfo:type=DataNodeInfo"); - } - Object versionObj = conn.getAttribute(mBeanName, "Version"); - LOG.info("host [" + host + "] runs version " + versionObj); - } catch (javax.management.AttributeNotFoundException attribNotFoundExc) { - // TODO don't hard-wire attrib name - LOG.warn("no attribute matching `Version' found", attribNotFoundExc); - } catch (javax.management.MalformedObjectNameException badObjNameExc) { - LOG.warn("bad object name: " + mBeanName, badObjNameExc); - } catch (javax.management.InstanceNotFoundException instNotFoundExc) { - LOG.warn("no MBean instance found", instNotFoundExc); - } catch (javax.management.ReflectionException reflectExc) { - LOG.warn("reflection error!", reflectExc); - } catch (javax.management.MBeanException mBeanExc) { - LOG.warn("MBean error!", mBeanExc); - } catch (java.io.IOException ioExc) { - LOG.debug("i/o error!", ioExc); - } - } - } - - private MBeanServerConnection getJMXEndpoint(String host, String port) { - MBeanServerConnection conn = null; - String urlPattern = null; - try { - urlPattern = - "service:jmx:rmi:///jndi/rmi://" - + host + ":" - + port - + "/jmxrmi"; - JMXServiceURL url = new JMXServiceURL(urlPattern); - JMXConnector connector = JMXConnectorFactory.connect(url); - conn = connector.getMBeanServerConnection(); - } catch (java.net.MalformedURLException badURLExc) { - LOG.debug("bad url: " + urlPattern, badURLExc); - } catch (java.io.IOException ioExc) { - LOG.debug("i/o error!", ioExc); - } - return conn; - } - /* debug-- - public void testHello() { - LOG.info("hello!"); - }*/ - - //@Test + // Trivial @Test public void testNameNodePing() throws IOException { LOG.info("testing filesystem ping"); NNClient namenode = dfsCluster.getNNClient(); @@ -266,7 +104,7 @@ public class TestBalancer { LOG.info("done."); } - //@Test + // Trivial @Test public void testNameNodeConnectDisconnect() throws IOException { LOG.info("connecting to namenode"); NNClient namenode = dfsCluster.getNNClient(); @@ -294,7 +132,7 @@ public class TestBalancer { List testDNList = null; Path balancerTempDir = null; try { - DNClient[] datanodes = getReserveDNs(); + DNClient[] datanodes = getReserveDataNodes(); DNClient datanode1 = datanodes[0]; DNClient datanode2 = datanodes[1]; @@ -304,9 +142,8 @@ public class TestBalancer { while (iter.hasNext()) { try { DNClient dn = iter.next(); - // TODO kill doesn't work anymore - // TODO do a ssh to admin gateway and sudo yinst with command text do down a specific datanode - stopDN( dn ); + // kill doesn't work with secure-HDFS, so using our stopDataNode() method + stopDataNode( dn ); i++; } catch (Exception e) { LOG.info("error shutting down node " + i + ": " + e); @@ -315,37 +152,55 @@ public class TestBalancer { LOG.info("attempting to kill both test nodes"); // TODO add check to make sure there is enough capacity on these nodes to run test - stopDN(datanode1); - stopDN(datanode2); + stopDataNode(datanode1); + stopDataNode(datanode2); LOG.info("starting up datanode ["+ datanode1.getHostName()+ "] and loading it with data"); - startDN(datanode1); - - LOG.info("datanode " + datanode1.getHostName() - + " contains " + getDataNodeUsedSpace(datanode1) + " bytes"); + startDataNode(datanode1); + // TODO make an appropriate JMXListener interface + JMXListenerBean lsnr1 = JMXListenerBean.listenForDataNodeInfo(datanode1); + // mkdir balancer-temp balancerTempDir = makeTempDir(); // TODO write 2 blocks to file system LOG.info("generating filesystem load"); - generateFSLoad(2); // generate 2 blocks of test data + // TODO spec blocks to generate by blockCount, blockSize, # of writers + generateFileSystemLoad(2); // generate 2 blocks of test data LOG.info("measure space used on 1st node"); - long usedSpace0 = getDataNodeUsedSpace(datanode1); + long usedSpace0 = lsnr1.getDataNodeUsedSpace(); LOG.info("datanode " + datanode1.getHostName() + " contains " + usedSpace0 + " bytes"); LOG.info("bring up a 2nd node and run balancer on DFS"); - startDN(datanode2); + startDataNode(datanode2); runBalancer(); + JMXListenerBean lsnr2 = JMXListenerBean.listenForDataNodeInfo(datanode2); LOG.info("measure blocks and files on both nodes, assert these " + "counts are identical pre- and post-balancer run"); - long usedSpace1 = getDataNodeUsedSpace(datanode1); - long usedSpace2 = getDataNodeUsedSpace(datanode2); - Assert.assertEquals(usedSpace0, usedSpace1 + usedSpace2); - + long usedSpace1 = lsnr1.getDataNodeUsedSpace(); + long usedSpace2 = lsnr2.getDataNodeUsedSpace(); + long observedValue = usedSpace1 + usedSpace2; + long expectedValue = usedSpace0; + int errorTolerance = 10; + double toleranceValue = expectedValue * (errorTolerance/100.0); + String assertMsg = + String.format( + "The observed used space [%d] exceeds the expected "+ + "used space [%d] by more than %d%% tolerance [%.2f]", + observedValue, expectedValue, + errorTolerance, toleranceValue ); + Assert.assertTrue( + assertMsg, + withinTolerance(expectedValue, observedValue, errorTolerance) ); + LOG.info( String.format( + "The observed used space [%d] approximates expected "+ + "used space [%d] within %d%% tolerance [%.2f]", + observedValue, expectedValue, + errorTolerance, toleranceValue) ); } catch (Throwable t) { LOG.info("method testBalancer failed", t); } finally { @@ -362,13 +217,13 @@ public class TestBalancer { while (iter.hasNext()) { DNClient dn = iter.next(); - startDN( dn ); + startDataNode( dn ); } } } - /* Kill all datanodes but 2, return a list of the reserved datanodes */ - private DNClient[] getReserveDNs() { + /** Kill all datanodes but 2, return a list of the reserved datanodes */ + private DNClient[] getReserveDataNodes() { List testDNs = new LinkedList(); List dieDNs = new LinkedList(); LOG.info("getting collection of live data nodes"); @@ -401,10 +256,10 @@ public class TestBalancer { dieDNs.remove(testDN); LOG.info("nodes reserved for test"); - printDNList(testDNs); + printDataNodeList(testDNs); LOG.info("nodes not used in test"); - printDNList(dieDNs); + printDataNodeList(dieDNs); DNClient[] arr = new DNClient[]{}; return (DNClient[]) testDNs.toArray(arr); @@ -422,6 +277,21 @@ public class TestBalancer { } /** + * Calculate if the error in expected and observed values is within tolerance + * + * @param expectedValue expected value of experiment + * @param observedValue observed value of experiment + * @param tolerance per cent tolerance for error, represented as a int + */ + private boolean withinTolerance(long expectedValue, + long observedValue, + int tolerance) { + double diff = 1.0 * Math.abs(observedValue - expectedValue); + double thrs = expectedValue * (tolerance/100); + return diff > thrs; + } + + /** * Make a working directory for storing temporary files * * @throws IOException @@ -459,28 +329,27 @@ public class TestBalancer { srcFs.delete(temp, true); } - private void printDNList(List lis) { + private void printDataNodeList(List lis) { for (DNClient datanode : lis) { LOG.info("\t" + datanode.getHostName()); } } private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin"; - private void stopDN(DNClient dn) { + private void stopDataNode(DNClient dn) { String dnHost = dn.getHostName(); runAndWatch(dnHost, CMD_STOP_DN); } private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin"; - private void startDN(DNClient dn) { + private void startDataNode(DNClient dn) { String dnHost = dn.getHostName(); runAndWatch(dnHost, CMD_START_DN); } /* using "old" default block size of 64M */ - private - static final int DFS_BLOCK_SIZE = 67108864; + private static final int DFS_BLOCK_SIZE = 67108864; - private void generateFSLoad(int numBlocks) { + private void generateFileSystemLoad(int numBlocks) { String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT"; SecureRandom randgen = new SecureRandom(); ByteArrayOutputStream dat = null; @@ -520,8 +389,6 @@ public class TestBalancer { public final static String CMD_KINIT = "/usr/kerberos/bin/kinit"; public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop"; public final static String OPT_BALANCER = "balancer"; - // NOTE this shouldn't be hardwired - public final static String HOST_NAMENODE = "gsbl90277.blue.ygrid.yahoo.com"; public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab"; public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM"; @@ -553,10 +420,228 @@ public class TestBalancer { new Thread(new StreamWatcher(in, out)).start(); } + static class JMXListenerBean { + + static final String OPTION_REMOTE_PORT = "-Dcom.sun.management.jmxremote.port"; + static final String HADOOP_JMX_SERVICE_NAME = "HadoopInfo"; + static final String HADOOP_JMX_INFO_DATANODE = "DataNodeInfo"; + + public static JMXListenerBean listenFor( + AbstractDaemonClient remoteDaemon, + String typeName) + throws + java.io.IOException, + InstanceNotFoundException { + String hostName = remoteDaemon.getHostName(); + int portNum = getJmxPortNumber(remoteDaemon); + ObjectName jmxBeanName = getJmxBeanName(typeName); + return new JMXListenerBean(hostName, portNum, jmxBeanName); + } + + public static JMXListenerBean listenForDataNodeInfo( + AbstractDaemonClient remoteDaemon) + throws + java.io.IOException, + InstanceNotFoundException { + return listenFor(remoteDaemon, HADOOP_JMX_INFO_DATANODE); + } + + private static int getJmxPortNumber(AbstractDaemonClient daemon) throws java.io.IOException { + String hadoopOpts = daemon.getProcessInfo().getEnv().get("HADOOP_OPTS"); + int portNumber = 0; + boolean found = false; + String[] options = hadoopOpts.split(" "); + for(String opt : options) { + if(opt.startsWith(OPTION_REMOTE_PORT)) { + found = true; + try { + portNumber = Integer.parseInt(opt.split("=")[1]); + } catch(NumberFormatException numFmtExc) { + throw new IllegalArgumentException("JMX remote port is not an integer"); + } catch(ArrayIndexOutOfBoundsException outOfBoundsExc) { + throw new IllegalArgumentException("JMX remote port not found"); + } + } + } + if (!found) { + String errMsg = + String.format("Cannot detect JMX remote port for %s daemon on host %s", + getDaemonType(daemon), + daemon.getHostName()); + throw new IllegalArgumentException(errMsg); + } + return portNumber; + } + + private static String getDaemonType(AbstractDaemonClient daemon) { + Class daemonClass = daemon.getClass(); + if (daemonClass.equals(DNClient.class)) + return "datanode"; + else if (daemonClass.equals(TTClient.class)) + return "tasktracker"; + else if (daemonClass.equals(NNClient.class)) + return "namenode"; + else if (daemonClass.equals(JTClient.class)) + return "jobtracker"; + else + return "unknown"; + } + + private MBeanServerConnection establishJmxConnection() { + MBeanServerConnection conn = null; + String urlPattern = String.format( + "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", + hostName, portNumber ); + try { + JMXServiceURL url = new JMXServiceURL(urlPattern); + JMXConnector connector = JMXConnectorFactory.connect(url,null); + conn = connector.getMBeanServerConnection(); + } catch(java.net.MalformedURLException badURLExc) { + LOG.debug("bad url: "+urlPattern, badURLExc); + } catch(java.io.IOException ioExc) { + LOG.debug("i/o error!", ioExc); + } + return conn; + } + + private static ObjectName getJmxBeanName(String typeName) { + ObjectName jmxBean = null; + String jmxRef = String.format( + "%s:type=%s", + HADOOP_JMX_SERVICE_NAME, typeName); + try { + jmxBean = new ObjectName(jmxRef); + } catch(MalformedObjectNameException badObjNameExc) { + LOG.debug("bad jmx name: "+jmxRef, badObjNameExc); + } + return jmxBean; + } + + private String hostName; + private int portNumber; + private ObjectName beanName; + + private JMXListenerBean(String hostName, int portNumber, ObjectName beanName) + throws + IOException, + InstanceNotFoundException { + //this.conn = conn; + this.hostName = hostName; + this.portNumber = portNumber; + this.beanName = beanName; + } + + private Object getAttribute(String attribName) + throws + javax.management.AttributeNotFoundException, + javax.management.InstanceNotFoundException, + javax.management.ReflectionException, + javax.management.MBeanException, + java.io.IOException { + + MBeanServerConnection conn = establishJmxConnection(); + return conn.getAttribute(beanName, attribName); + } + + private final static String TITLE_UBAR; + private final static String TOTAL_OBAR; + static { + char[] ubar1 = new char[100]; + Arrays.fill(ubar1, '='); + TITLE_UBAR = new String(ubar1); + Arrays.fill(ubar1, '-'); + TOTAL_OBAR = new String(ubar1); + } + + private void printVolInfo(Map volInfoMap) { + StringBuilder bldr = new StringBuilder(); + if (LOG.isDebugEnabled()) { + String spaceType = (String)volInfoMap.get("spaceType"); + String spaceTypeHeader = "Space "; + if(spaceType.startsWith("used")) { + spaceTypeHeader += "Used"; + } else { + spaceTypeHeader += "Free"; + } + String titleLine = String.format( + "%30s\t%20s\n%30s\t%20s", + "Volume", "Space "+spaceType, TITLE_UBAR, TITLE_UBAR); + bldr.append( titleLine ); + for (Object key : volInfoMap.keySet()) { + if ("total".equals(key)) + continue; + + Map attrMap = (Map) volInfoMap.get(key); + long usedSpace = (Long) attrMap.get(spaceType); + bldr.append(String.format("%30s\t%20s",key,usedSpace)); + } + String totalLine = String.format( + "%30s\t%20s\n%30s\t%20s", + TOTAL_OBAR, TOTAL_OBAR, "Total", volInfoMap.get("total")); + bldr.append(totalLine); + LOG.debug( bldr.toString() ); + } + } + + public Map processVolInfo(String spaceType) + throws + javax.management.AttributeNotFoundException, + javax.management.InstanceNotFoundException, + javax.management.ReflectionException, + javax.management.MBeanException, + java.io.IOException { + + Object volInfo = getAttribute("VolumeInfo"); + LOG.debug("retrieved volume info object " + volInfo); + Map info = (Map) JSON.parse(volInfo.toString()); + long total = 0L; + for (Object key : info.keySet()) { + Map attrMap = (Map) info.get(key); + long volAlloc = (Long) attrMap.get(spaceType); + LOG.info(String.format("volume %s has %d bytes space in use", key, volAlloc)); + total += volAlloc; + } + info.put("total", total); + info.put("spaceType", spaceType); + return info; + } + + public long getDataNodeUsedSpace() + throws + javax.management.AttributeNotFoundException, + javax.management.InstanceNotFoundException, + javax.management.ReflectionException, + javax.management.MBeanException, + java.io.IOException { + + LOG.debug("checking DFS space used on host " + hostName); + Map volInfoMap = processVolInfo("usedSpace"); + printVolInfo(volInfoMap); + long totalUsedSpace = Long.parseLong(volInfoMap.get("total").toString()); + return totalUsedSpace; + } + + public long getDataNodeFreeSpace() + throws + javax.management.AttributeNotFoundException, + javax.management.InstanceNotFoundException, + javax.management.ReflectionException, + javax.management.MBeanException, + java.io.IOException { + + LOG.debug("checking DFS space free on host " + hostName); + Map volInfoMap = processVolInfo("freeSpace"); + printVolInfo(volInfoMap); + long totalFreeSpace = Long.parseLong(volInfoMap.get("total").toString()); + return totalFreeSpace; + } + } + + /** simple utility to watch streams from an exec'ed process */ static class StreamWatcher implements Runnable { - BufferedReader reader; - PrintStream printer; + private BufferedReader reader; + private PrintStream printer; StreamWatcher(InputStream in, PrintStream out) { reader = getReader(in); @@ -577,6 +662,7 @@ public class TestBalancer { } } + /** simple utility to report progress in generating data */ static class ProgressReporter implements Progressable { StringBuffer buf = null;