Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 98424 invoked from network); 23 Aug 2007 14:30:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Aug 2007 14:30:02 -0000 Received: (qmail 9872 invoked by uid 500); 23 Aug 2007 14:29:58 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 9850 invoked by uid 500); 23 Aug 2007 14:29:58 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 9841 invoked by uid 99); 23 Aug 2007 14:29:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Aug 2007 07:29:58 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Aug 2007 14:29:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F13471A981A; Thu, 23 Aug 2007 07:29:37 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r569012 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/ Date: Thu, 23 Aug 2007 14:29:36 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070823142937.F13471A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Aug 23 07:29:35 2007 New Revision: 569012 URL: http://svn.apache.org/viewvc?rev=569012&view=rev Log: HADOOP-1654. Add IOUtils. Contributed by Enis Soztutar. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Aug 23 07:29:35 2007 @@ -88,6 +88,9 @@ HADOOP-1744. Remove many uses of the deprecated UTF8 class from the HDFS namenode. (Christophe Taton via cutting) + HADOOP-1654. Add IOUtils class, containing generic io-related + utility methods. (Enis Soztutar via cutting) + Release 0.14.0 - 2007-08-17 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Thu Aug 23 07:29:35 2007 @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.io.retry.*; @@ -133,6 +132,7 @@ boolean offlineUpgrade; /** Returns string that has block id and the associated file */ + @Override public String toString() { return block + " (filename: " + ( (crcInfo == null || crcInfo.fileName == null) ? @@ -186,7 +186,7 @@ DFSClient.BlockReader reader = DFSClient.BlockReader.newBlockReader (dnSock, crcFile, blk.getBlockId(), offset, len, (int)Math.min(len, 4096)); - FileUtil.readFully(reader, buf, bufOffset, (int)len); + IOUtils.readFully(reader, buf, bufOffset, (int)len); return; } catch (IOException ioe) { LOG.warn("Could not read " + blk + " from " + dn.getName() + " : " + @@ -243,7 +243,7 @@ } in.readFully(buf, bufOffset, len); } finally { - FileUtil.closeStream(in); + IOUtils.closeStream(in); } } @@ -353,12 +353,12 @@ in = new FileInputStream( metaFile ); in.skip(7); //should be skipFully(). byte[] storedChecksum = new byte[ crcBuf.length ]; - FileUtil.readFully(in, storedChecksum, 0, storedChecksum.length); + IOUtils.readFully(in, storedChecksum, 0, storedChecksum.length); if ( !Arrays.equals(crcBuf, storedChecksum) ) { throw new IOException("CRC does not match"); } } finally { - FileUtil.closeStream(in); + IOUtils.closeStream(in); } return; } @@ -390,7 +390,7 @@ metaFile); } } finally { - FileUtil.closeStream(out); + IOUtils.closeStream(out); if ( tmpBlockFile != null ) { tmpBlockFile.delete(); } @@ -475,7 +475,7 @@ * But we are not optimizing for this case. */ if ( toRead > 0 ) { - FileUtil.readFully(in, blockBuf, 0, toRead); + IOUtils.readFully(in, blockBuf, 0, toRead); } if ( (toRead == 0 && bytesAfter.length > 0) || toRead >= verifyLen ) { @@ -533,7 +533,7 @@ assert newCrcBuf.length == newCrcOffset : "something is wrong"; return newCrcBuf; } finally { - FileUtil.closeStream(in); + IOUtils.closeStream(in); } } @@ -811,7 +811,7 @@ while (totalRead < blockLen) { int toRead = Math.min((int)(blockLen - totalRead), bytesPerChecksum); - FileUtil.readFully(in, dataBuf, 0, toRead ); + IOUtils.readFully(in, dataBuf, 0, toRead ); checksum.update(dataBuf, 0, toRead); crcBufPos += checksum.writeValue(crcBuf, crcBufPos, true); @@ -820,7 +820,7 @@ totalRead += toRead; } } finally { - FileUtil.closeStream(in); + IOUtils.closeStream(in); } writeCrcData(blockInfo, bytesPerChecksum, crcBuf); @@ -902,7 +902,7 @@ } } while (false); } finally { - FileUtil.closeSocket( dnSock ); + IOUtils.closeSocket( dnSock ); } throw new IOException("Error while fetching CRC from replica on " + @@ -1139,6 +1139,7 @@ this.errors = errors; } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); datanodeId.readFields(in); @@ -1147,6 +1148,7 @@ errors = in.readInt(); } + @Override public void write(DataOutput out) throws IOException { super.write(out); datanodeId.write(out); @@ -1170,11 +1172,13 @@ block = blk; } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); block.readFields(in); } + @Override public void write(DataOutput out) throws IOException { super.write(out); block.write(out); @@ -1194,11 +1198,13 @@ crcInfo = info; } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); crcInfo.readFields(in); } + @Override public void write(DataOutput out) throws IOException { super.write(out); crcInfo.write(out); @@ -1305,6 +1311,7 @@ (short) Math.floor(blocksUpgraded*100.0/blocksToUpgrade); } + @Override public UpgradeCommand completeUpgrade() throws IOException { // return latest stats command. assert getUpgradeStatus() == 100; @@ -1375,6 +1382,7 @@ } } + @Override void doUpgrade() throws IOException { doUpgradeInternal(); } @@ -1661,6 +1669,7 @@ return (short) Math.floor(avgDatanodeCompletionPct * 0.9); } + @Override public UpgradeCommand startUpgrade() throws IOException { assert monitorThread == null; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Aug 23 07:29:35 2007 @@ -79,6 +79,7 @@ clients.add(client); } + @Override public synchronized void run() { for (DFSClient client : clients) { if (client.running) { @@ -591,6 +592,7 @@ * because it first reads the data to user buffer and then checks * the checksum. */ + @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { @@ -610,6 +612,7 @@ return super.read(buf, off, len); } + @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least * in majority of the cases?. This one throws. */ @@ -629,11 +632,13 @@ return nSkipped; } + @Override public int read() throws IOException { throw new IOException("read() is not expected to be invoked. " + "Use read(buf, off, len) instead."); } + @Override public boolean seekToNewSource(long targetPos) throws IOException { /* Checksum errors are handled outside the BlockReader. * DFSInputStream does not always call 'seekToNewSource'. In the @@ -642,15 +647,18 @@ return false; } + @Override public void seek(long pos) throws IOException { throw new IOException("Seek() is not supported in BlockInputChecker"); } + @Override protected long getChunkPosition(long pos) { throw new RuntimeException("getChunkPosition() is not supported, " + "since seek is not required"); } + @Override protected synchronized int readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf) throws IOException { @@ -690,11 +698,11 @@ if ( chunkLen > 0 ) { // len should be >= chunkLen - FileUtil.readFully(in, buf, offset, chunkLen); + IOUtils.readFully(in, buf, offset, chunkLen); } if ( checksumSize > 0 ) { - FileUtil.readFully(in, checksumBuf, 0, checksumSize); + IOUtils.readFully(in, checksumBuf, 0, checksumSize); } lastChunkOffset = chunkOffset; @@ -773,6 +781,7 @@ startOffset, firstChunkOffset ); } + @Override public synchronized void close() throws IOException { startOffset = -1; checksum = null; @@ -1005,6 +1014,7 @@ /** * Close it down! */ + @Override public synchronized void close() throws IOException { checkOpen(); if (closed) { @@ -1024,6 +1034,7 @@ closed = true; } + @Override public synchronized int read() throws IOException { int ret = read( oneByteBuf, 0, 1 ); return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); @@ -1062,6 +1073,7 @@ /** * Read the entire buffer. */ + @Override public synchronized int read(byte buf[], int off, int len) throws IOException { checkOpen(); if (closed) { @@ -1197,6 +1209,7 @@ * * @return actual number of bytes read */ + @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { // sanity checks @@ -1230,6 +1243,7 @@ return realLen; } + @Override public long skip(long n) throws IOException { if ( n > 0 ) { long curPos = getPos(); @@ -1246,6 +1260,7 @@ /** * Seek to a new arbitrary location */ + @Override public synchronized void seek(long targetPos) throws IOException { if (targetPos > getFileLength()) { throw new IOException("Cannot seek after EOF"); @@ -1276,6 +1291,7 @@ * a node other than the current node is found, then returns true. * If another node could not be found, then returns false. */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { boolean markedDead = deadNodes.containsKey(currentNode); addToDeadNodes(currentNode); @@ -1296,12 +1312,14 @@ /** */ + @Override public synchronized long getPos() throws IOException { return pos; } /** */ + @Override public synchronized int available() throws IOException { if (closed) { throw new IOException("Stream closed"); @@ -1312,11 +1330,14 @@ /** * We definitely don't support marks */ + @Override public boolean markSupported() { return false; } + @Override public void mark(int readLimit) { } + @Override public void reset() throws IOException { throw new IOException("Mark/reset not supported"); } @@ -1537,6 +1558,7 @@ } // @see FSOutputSummer#writeChunk() + @Override protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException { checkOpen(); @@ -1599,7 +1621,7 @@ while ( bytesLeft >= 0 ) { int len = (int) Math.min( bytesLeft, bytesPerChecksum ); if ( len > 0 ) { - FileUtil.readFully( in, buf, 0, len + checksumSize); + IOUtils.readFully( in, buf, 0, len + checksumSize); } blockStream.writeInt( len ); @@ -1680,6 +1702,7 @@ * Closes this output stream and releases any system * resources associated with this stream. */ + @Override public synchronized void close() throws IOException { checkOpen(); if (closed) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Aug 23 07:29:35 2007 @@ -20,6 +20,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.metrics.MetricsUtil; @@ -32,8 +33,6 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.dfs.BlockCommand; import org.apache.hadoop.dfs.DatanodeProtocol; -import org.apache.hadoop.fs.FileUtil; - import java.io.*; import java.net.*; import java.util.*; @@ -449,6 +448,7 @@ Count(int init) { value = init; } synchronized void incr() { value++; } synchronized void decr() { value--; } + @Override public String toString() { return Integer.toString(value); } public int getValue() { return value; } } @@ -1063,7 +1063,7 @@ } byte [] buf = new byte[(int)fileSize]; - FileUtil.readFully(checksumIn, buf, 0, buf.length); + IOUtils.readFully(checksumIn, buf, 0, buf.length); out = new DataOutputStream(s.getOutputStream()); @@ -1074,7 +1074,7 @@ //last DATA_CHUNK out.writeInt(0); } finally { - FileUtil.closeStream(checksumIn); + IOUtils.closeStream(checksumIn); } } } @@ -1171,7 +1171,7 @@ blockInFile.seek(offset); if (checksumSkip > 0) { //Should we use seek() for checksum file as well? - FileUtil.skipFully(checksumIn, checksumSkip); + IOUtils.skipFully(checksumIn, checksumSkip); } } @@ -1215,7 +1215,7 @@ LOG.warn( " Could not read checksum for data at offset " + offset + " for block " + block + " got : " + StringUtils.stringifyException(e) ); - FileUtil.closeStream( checksumIn ); + IOUtils.closeStream( checksumIn ); checksumIn = null; if ( corruptChecksumOk ) { // Just fill the array with zeros. @@ -1238,10 +1238,10 @@ offset += len; } } finally { - FileUtil.closeStream( blockInFile ); - FileUtil.closeStream( checksumIn ); - FileUtil.closeStream( blockIn ); - FileUtil.closeStream( out ); + IOUtils.closeStream( blockInFile ); + IOUtils.closeStream( checksumIn ); + IOUtils.closeStream( blockIn ); + IOUtils.closeStream( out ); } return totalRead; @@ -1285,7 +1285,7 @@ targets[0].getName() + " got " + StringUtils.stringifyException( ie ) ); } finally { - FileUtil.closeSocket(sock); + IOUtils.closeSocket(sock); xmitsInProgress--; } } @@ -1393,6 +1393,7 @@ return null; } + @Override public String toString() { return "DataNode{" + "data=" + data + @@ -1465,6 +1466,7 @@ // read & log any error messages from the running script Thread errThread = new Thread() { + @Override public void start() { try { String errLine = errR.readLine(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Thu Aug 23 07:29:35 2007 @@ -22,9 +22,8 @@ import java.util.Enumeration; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; -import java.net.Socket; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.StringUtils; /** @@ -137,12 +136,8 @@ } } else if (srcFS.isFile(src)) { InputStream in = srcFS.open(src); - try { - OutputStream out = dstFS.create(dst, overwrite); - copyContent(in, out, conf); - } finally { - in.close(); - } + OutputStream out = dstFS.create(dst, overwrite); + IOUtils.copyBytes(in, out, conf, true); } else { throw new IOException(src.toString() + ": No such file or directory"); } @@ -172,7 +167,7 @@ if (srcFS.isFile(contents[i])) { InputStream in = srcFS.open(contents[i]); try { - copyContent(in, out, conf, false); + IOUtils.copyBytes(in, out, conf, false); if (addString!=null) out.write(addString.getBytes("UTF-8")); @@ -211,11 +206,7 @@ } } else if (src.isFile()) { InputStream in = new FileInputStream(src); - try { - copyContent(in, dstFS.create(dst), conf); - } finally { - in.close(); - } + IOUtils.copyBytes(in, dstFS.create(dst), conf); } if (deleteSource) { return FileUtil.fullyDelete(src); @@ -239,11 +230,7 @@ } } else if (srcFS.isFile(src)) { InputStream in = srcFS.open(src); - try { - copyContent(in, new FileOutputStream(dst), conf); - } finally { - in.close(); - } + IOUtils.copyBytes(in, new FileOutputStream(dst), conf); } if (deleteSource) { return srcFS.delete(src); @@ -252,27 +239,6 @@ } } - private static void copyContent(InputStream in, OutputStream out, - Configuration conf) throws IOException { - copyContent(in, out, conf, true); - } - - - private static void copyContent(InputStream in, OutputStream out, - Configuration conf, boolean close) throws IOException { - byte buf[] = new byte[conf.getInt("io.file.buffer.size", 4096)]; - try { - int bytesRead = in.read(buf); - while (bytesRead >= 0) { - out.write(buf, 0, bytesRead); - bytesRead = in.read(buf); - } - } finally { - if (close) - out.close(); - } - } - private static Path checkDest(String srcName, FileSystem dstFS, Path dst) throws IOException { if (dstFS.exists(dst)) { @@ -519,51 +485,5 @@ tmp.deleteOnExit(); } return tmp; - } - - //XXX These functions should be in IO Utils rather than FileUtil - // Reads len bytes in a loop. - public static void readFully( InputStream in, byte buf[], - int off, int len ) throws IOException { - int toRead = len; - while ( toRead > 0 ) { - int ret = in.read( buf, off, toRead ); - if ( ret < 0 ) { - throw new IOException( "Premeture EOF from inputStream"); - } - toRead -= ret; - off += ret; - } - } - - public static void skipFully( InputStream in, long len ) throws IOException { - long toSkip = len; - while ( toSkip > 0 ) { - long ret = in.skip( toSkip ); - if ( ret < 0 ) { - throw new IOException( "Premeture EOF from inputStream"); - } - toSkip -= ret; - } - } - - public static void closeSocket( Socket sock ) { - // avoids try { close() } dance - if ( sock != null ) { - try { - sock.close(); - } catch ( IOException ignored ) { - } - } - } - - public static void closeStream(Closeable closeable ) { - // avoids try { close() } dance - if ( closeable != null ) { - try { - closeable.close(); - } catch ( IOException ignored ) { - } - } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Thu Aug 23 07:29:35 2007 @@ -17,13 +17,15 @@ */ package org.apache.hadoop.fs; -import java.io.*; +import java.io.File; +import java.io.IOException; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Tool; @@ -67,22 +69,7 @@ } } - /** - * Copies from one stream to another. - */ - private void copyBytes(InputStream in, OutputStream out) throws IOException { - PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; - byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)]; - int bytesRead = in.read(buf); - while (bytesRead >= 0) { - out.write(buf, 0, bytesRead); - if ((ps != null) && ps.checkError()) { - throw new IOException("Unable to write to output stream."); - } - bytesRead = in.read(buf); - } - } - + /** * Copies from stdin to the indicated file. */ @@ -95,8 +82,9 @@ } FSDataOutputStream out = fs.create(dst); try { - copyBytes(System.in, out); - } finally { + IOUtils.copyBytes(System.in, out, getConf(), false); + } + finally { out.close(); } } @@ -109,12 +97,7 @@ throw new IOException("Source must be a file."); } FSDataInputStream in = fs.open(src); - try { - copyBytes(in, System.out); - } finally { - in.close(); - } - + IOUtils.copyBytes(in, System.out, getConf(), true); } /** Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java?rev=569012&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/IOUtils.java Thu Aug 23 07:29:35 2007 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.io.*; +import java.net.Socket; + +import org.apache.hadoop.conf.Configuration; + +/** + * An utility class for I/O related functionality. + */ +public class IOUtils { + + /** + * Copies from one stream to another. + * @param in InputStrem to read from + * @param out OutputStream to write to + * @param buffSize the size of the buffer + * @param close whether or not close the InputStream and + * OutputStream at the end. The streams are closed in the finally clause. + */ + public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) + throws IOException { + + PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; + byte buf[] = new byte[buffSize]; + try { + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesRead = in.read(buf); + } + } finally { + if(close) { + out.close(); + in.close(); + } + } + } + + /** + * Copies from one stream to another. closes the input and output streams + * at the end. + * @param in InputStrem to read from + * @param out OutputStream to write to + * @param conf the Configuration object + */ + public static void copyBytes(InputStream in, OutputStream out, Configuration conf) + throws IOException { + copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); + } + + /** + * Copies from one stream to another. + * @param in InputStrem to read from + * @param out OutputStream to write to + * @param conf the Configuration object + * @param close whether or not close the InputStream and + * OutputStream at the end. The streams are closed in the finally clause. + */ + public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) + throws IOException { + copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); + } + + /** Reads len bytes in a loop. + * @param in The InputStream to read from + * @param buf The buffer to fill + * @param off offset from the buffer + * @param len the length of bytes to read + * @throws IOException if it could not read requested number of bytes + * for any reason (including EOF) + */ + public static void readFully( InputStream in, byte buf[], + int off, int len ) throws IOException { + int toRead = len; + while ( toRead > 0 ) { + int ret = in.read( buf, off, toRead ); + if ( ret < 0 ) { + throw new IOException( "Premeture EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } + + /** Similar to readFully(). Skips bytes in a loop. + * @param in The InputStream to skip bytes from + * @param len number of bytes to skip. + * @throws IOException if it could not skip requested number of bytes + * for any reason (including EOF) + */ + public static void skipFully( InputStream in, long len ) throws IOException { + while ( len > 0 ) { + long ret = in.skip( len ); + if ( ret < 0 ) { + throw new IOException( "Premeture EOF from inputStream"); + } + len -= ret; + } + } + + /** + * Closes the stream ignoring {@link IOException} + * @param stream the Stream to close + */ + public static void closeStream( java.io.Closeable stream ) { + // avoids try { close() } dance + if ( stream != null ) { + try { + stream.close(); + } catch ( IOException ignored ) { + } + } + } + + /** + * Closes the socket ignoring {@link IOException} + * @param sock the Socket to close + */ + public static void closeSocket( Socket sock ) { + // avoids try { close() } dance + if ( sock != null ) { + try { + sock.close(); + } catch ( IOException ignored ) { + } + } + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=569012&r1=569011&r2=569012&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Thu Aug 23 07:29:35 2007 @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream; import org.apache.commons.logging.Log; @@ -95,7 +95,7 @@ assertEquals("checking byte[" + i + "]", recvBuf[i], retBuf[i]); } } finally { - FileUtil.closeSocket(sock); + IOUtils.closeSocket(sock); } }