Author: shv Date: Fri Oct 9 06:29:10 2009 New Revision: 823421 URL: http://svn.apache.org/viewvc?rev=823421&view=rev Log: HDFS-663. DFSIO for append. Contributed by Konstantin Shvachko. Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=823421&r1=823420&r2=823421&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Fri Oct 9 06:29:10 2009 @@ -249,6 +249,8 @@ HDFS-518. Create new tests for Append's hflush. (Konstantin Boudnik via szetszwo) + HDFS-663. DFSIO for append. (shv) + BUG FIXES HDFS-76. Better error message to users when commands fail because of Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java?rev=823421&r1=823420&r2=823421&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java Fri Oct 9 06:29:10 2009 @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -74,6 +75,7 @@ private static final int TEST_TYPE_READ = 0; private static final int TEST_TYPE_WRITE = 1; private static final int TEST_TYPE_CLEANUP = 2; + private static final int TEST_TYPE_APPEND = 3; private static final int DEFAULT_BUFFER_SIZE = 1000000; private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; @@ -83,6 +85,7 @@ private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); + private static Path APPEND_DIR = new Path(TEST_ROOT_DIR, "io_append"); private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); static{ @@ -96,7 +99,7 @@ * @throws Exception */ public void testIOs() throws Exception { - testIOs(10, 10, new Configuration()); + testIOs(1, 4, new Configuration()); } /** @@ -108,13 +111,32 @@ */ public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig) throws IOException { + fsConfig.setBoolean("dfs.support.append", true); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(fsConfig, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + + createControlFile(fs, fileSize, nrFiles, fsConfig); + long tStart = System.currentTimeMillis(); + writeTest(fs, fsConfig); + long execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME); - FileSystem fs = FileSystem.get(fsConfig); + tStart = System.currentTimeMillis(); + readTest(fs, fsConfig); + execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME); + + tStart = System.currentTimeMillis(); + appendTest(fs, fsConfig); + execTime = System.currentTimeMillis() - tStart; + analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME); - createControlFile(fs, fileSize, nrFiles, fsConfig); - writeTest(fs, fsConfig); - readTest(fs, fsConfig); - cleanup(fs); + cleanup(fs); + } finally { + if(cluster != null) cluster.shutdown(); + } } private static void createControlFile(FileSystem fs, @@ -255,6 +277,48 @@ } /** + * Append mapper class. + */ + public static class AppendMapper extends IOStatMapper { + + public AppendMapper() { + for(int i=0; i < bufferSize; i++) + buffer[i] = (byte)('0' + i % 50); + } + + public Long doIO(Reporter reporter, + String name, + long totalSize + ) throws IOException { + // create file + totalSize *= MEGA; + OutputStream out; + out = fs.append(new Path(DATA_DIR, name), bufferSize); + + try { + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); + } + } finally { + out.close(); + } + return Long.valueOf(totalSize); + } + } + + private static void appendTest(FileSystem fs, Configuration fsConfig) + throws IOException { + fs.delete(APPEND_DIR, true); + runIOTest(AppendMapper.class, APPEND_DIR, fsConfig); + } + + /** * Read mapper class. */ public static class ReadMapper extends IOStatMapper { @@ -301,6 +365,8 @@ ioer = new ReadMapper(); else if (testType == TEST_TYPE_WRITE) ioer = new WriteMapper(); + else if (testType == TEST_TYPE_APPEND) + ioer = new AppendMapper(); else return; for(int i=0; i < nrFiles; i++) @@ -318,9 +384,11 @@ boolean isSequential = false; String className = TestDFSIO.class.getSimpleName(); - String version = className + ".0.0.4"; - String usage = "Usage: " + className + " -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; - + String version = className + ".0.0.5"; + String usage = "Usage: " + className + + " -read | -write | -append | -clean [-nrFiles N] [-fileSize MB]" + + " [-resFile resultFileName] [-bufferSize Bytes]"; + System.out.println(version); if (args.length == 0) { System.err.println(usage); @@ -331,6 +399,8 @@ testType = TEST_TYPE_READ; } else if (args[i].equals("-write")) { testType = TEST_TYPE_WRITE; + } else if (args[i].equals("-append")) { + testType = TEST_TYPE_APPEND; } else if (args[i].equals("-clean")) { testType = TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { @@ -353,6 +423,7 @@ try { Configuration fsConfig = new Configuration(); fsConfig.setInt("test.io.file.buffer.size", bufferSize); + fsConfig.setBoolean("dfs.support.append", true); FileSystem fs = FileSystem.get(fsConfig); if (isSequential) { @@ -373,6 +444,8 @@ writeTest(fs, fsConfig); if (testType == TEST_TYPE_READ) readTest(fs, fsConfig); + if (testType == TEST_TYPE_APPEND) + appendTest(fs, fsConfig); long execTime = System.currentTimeMillis() - tStart; analyzeResult(fs, testType, execTime, resFileName); @@ -390,7 +463,9 @@ Path reduceFile; if (testType == TEST_TYPE_WRITE) reduceFile = new Path(WRITE_DIR, "part-00000"); - else + else if (testType == TEST_TYPE_APPEND) + reduceFile = new Path(APPEND_DIR, "part-00000"); + else // if (testType == TEST_TYPE_READ) reduceFile = new Path(READ_DIR, "part-00000"); long tasks = 0; long size = 0; @@ -427,6 +502,7 @@ String resultLines[] = { "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : (testType == TEST_TYPE_READ) ? "read" : + (testType == TEST_TYPE_APPEND) ? "append" : "unknown"), " Date & time: " + new Date(System.currentTimeMillis()), " Number of files: " + tasks,