Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F321D8DA for ; Fri, 27 Jul 2012 06:28:01 +0000 (UTC) Received: (qmail 1869 invoked by uid 500); 27 Jul 2012 06:28:00 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 1827 invoked by uid 500); 27 Jul 2012 06:28:00 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 1754 invoked by uid 99); 27 Jul 2012 06:27:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jul 2012 06:27:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jul 2012 06:27:54 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7BA8F23888EA; Fri, 27 Jul 2012 06:27:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1366293 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/web/ src/test/java/org/apache/hadoop/hdfs/web/ Date: Fri, 27 Jul 2012 06:27:09 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120727062709.7BA8F23888EA@eris.apache.org> Author: szetszwo Date: Fri Jul 27 06:27:08 2012 New Revision: 1366293 URL: http://svn.apache.org/viewvc?rev=1366293&view=rev Log: HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations to get around a Java library bug causing OutOfMemoryError. Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1366293&r1=1366292&r2=1366293&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 27 06:27:08 2012 @@ -83,6 +83,9 @@ Release 0.23.3 - UNRELEASED HDFS-3626. Creating file with invalid path can corrupt edit log (todd) + HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations + to get around a Java library bug causing OutOfMemoryError. (szetszwo) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1366293&r1=1366292&r2=1366293&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Fri Jul 27 06:27:08 2012 @@ -412,6 +412,7 @@ public class WebHdfsFileSystem extends F //Step 2) Submit another Http request with the URL from the Location header with data. conn = (HttpURLConnection)new URL(redirect).openConnection(); conn.setRequestMethod(op.getType().toString()); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk return conn; } @@ -824,8 +825,7 @@ public class WebHdfsFileSystem extends F } private static WebHdfsFileSystem getWebHdfs( - final Token token, final Configuration conf - ) throws IOException, InterruptedException, URISyntaxException { + final Token token, final Configuration conf) throws IOException { final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token); final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr); @@ -839,12 +839,7 @@ public class WebHdfsFileSystem extends F // update the kerberos credentials, if they are coming from a keytab ugi.reloginFromKeytab(); - try { - WebHdfsFileSystem webhdfs = getWebHdfs(token, conf); - return webhdfs.renewDelegationToken(token); - } catch (URISyntaxException e) { - throw new IOException(e); - } + return getWebHdfs(token, conf).renewDelegationToken(token); } @Override @@ -854,12 +849,7 @@ public class WebHdfsFileSystem extends F // update the kerberos credentials, if they are coming from a keytab ugi.checkTGTAndReloginFromKeytab(); - try { - final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf); - webhdfs.cancelDelegationToken(token); - } catch (URISyntaxException e) { - throw new IOException(e); - } + getWebHdfs(token, conf).cancelDelegationToken(token); } } Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java?rev=1366293&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java (added) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java Fri Jul 27 06:27:08 2012 @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Assert; +import org.junit.Test; + +/** Test WebHDFS */ +public class TestWebHDFS { + static final Log LOG = LogFactory.getLog(TestWebHDFS.class); + + static final Random RANDOM = new Random(); + + static final long systemStartTime = System.nanoTime(); + + /** A timer for measuring performance. */ + static class Ticker { + final String name; + final long startTime = System.nanoTime(); + private long previousTick = startTime; + + Ticker(final String name, String format, Object... args) { + this.name = name; + LOG.info(String.format("\n\n%s START: %s\n", + name, String.format(format, args))); + } + + void tick(final long nBytes, String format, Object... args) { + final long now = System.nanoTime(); + if (now - previousTick > 10000000000L) { + previousTick = now; + final double mintues = (now - systemStartTime)/60000000000.0; + LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues, + String.format(format, args), toMpsString(nBytes, now))); + } + } + + void end(final long nBytes) { + final long now = System.nanoTime(); + final double seconds = (now - startTime)/1000000000.0; + LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n", + name, seconds, toMpsString(nBytes, now))); + } + + String toMpsString(final long nBytes, final long now) { + final double mb = nBytes/(double)(1<<20); + final double mps = mb*1000000000.0/(now - startTime); + return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps); + } + } + + @Test + public void testLargeFile() throws Exception { + largeFileTest(200L << 20); //200MB file length + } + + /** Test read and write large files. */ + static void largeFileTest(final long fileLength) throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + + final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf); + final Path dir = new Path("/test/largeFile"); + Assert.assertTrue(fs.mkdirs(dir)); + + final byte[] data = new byte[1 << 20]; + RANDOM.nextBytes(data); + + final byte[] expected = new byte[2 * data.length]; + System.arraycopy(data, 0, expected, 0, data.length); + System.arraycopy(data, 0, expected, data.length, data.length); + + final Path p = new Path(dir, "file"); + final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength); + final FSDataOutputStream out = fs.create(p); + try { + long remaining = fileLength; + for(; remaining > 0;) { + t.tick(fileLength - remaining, "remaining=%d", remaining); + + final int n = (int)Math.min(remaining, data.length); + out.write(data, 0, n); + remaining -= n; + } + } finally { + out.close(); + } + t.end(fileLength); + + Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen()); + + final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20); + final long largeOffset = fileLength - smallOffset; + final byte[] buf = new byte[data.length]; + + verifySeek(fs, p, largeOffset, fileLength, buf, expected); + verifySeek(fs, p, smallOffset, fileLength, buf, expected); + + verifyPread(fs, p, largeOffset, fileLength, buf, expected); + } finally { + cluster.shutdown(); + } + } + + static void checkData(long offset, long remaining, int n, + byte[] actual, byte[] expected) { + if (RANDOM.nextInt(100) == 0) { + int j = (int)(offset % actual.length); + for(int i = 0; i < n; i++) { + if (expected[j] != actual[i]) { + Assert.fail("expected[" + j + "]=" + expected[j] + + " != actual[" + i + "]=" + actual[i] + + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n); + } + j++; + } + } + } + + /** test seek */ + static void verifySeek(FileSystem fs, Path p, long offset, long length, + byte[] buf, byte[] expected) throws IOException { + long remaining = length - offset; + long checked = 0; + LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining); + + final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d", + offset, remaining); + final FSDataInputStream in = fs.open(p, 64 << 10); + in.seek(offset); + for(; remaining > 0; ) { + t.tick(checked, "offset=%d, remaining=%d", offset, remaining); + final int n = (int)Math.min(remaining, buf.length); + in.readFully(buf, 0, n); + checkData(offset, remaining, n, buf, expected); + + offset += n; + remaining -= n; + checked += n; + } + in.close(); + t.end(checked); + } + + static void verifyPread(FileSystem fs, Path p, long offset, long length, + byte[] buf, byte[] expected) throws IOException { + long remaining = length - offset; + long checked = 0; + LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining); + + final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d", + offset, remaining); + final FSDataInputStream in = fs.open(p, 64 << 10); + for(; remaining > 0; ) { + t.tick(checked, "offset=%d, remaining=%d", offset, remaining); + final int n = (int)Math.min(remaining, buf.length); + in.readFully(offset, buf, 0, n); + checkData(offset, remaining, n, buf, expected); + + offset += n; + remaining -= n; + checked += n; + } + in.close(); + t.end(checked); + } +} Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1366293&r1=1366292&r2=1366293&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Fri Jul 27 06:27:08 2012 @@ -40,6 +40,12 @@ import org.junit.Assert; public class WebHdfsTestUtil { public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class); + public static Configuration createConf() { + final Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); + return conf; + } + public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf ) throws IOException, URISyntaxException { final String uri = WebHdfsFileSystem.SCHEME + "://"