Author: dhruba
Date: Tue Jul 31 16:57:48 2007
New Revision: 561603
URL: http://svn.apache.org/viewvc?view=rev&rev=561603
Log:
HADOOP-999. A HDFS Client immediately informs the NameNode of a new
file creation. ClientProtocol version changed from 14 to 15.
(Tsz Wo (Nicholas), SZE via dhruba)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 31 16:57:48 2007
@@ -5,6 +5,10 @@
INCOMPATIBLE CHANGES
+ HADOOP-999. A HDFS Client immediately informs the NameNode of a new
+ file creation. ClientProtocol version changed from 14 to 15.
+ (Tsz Wo (Nicholas), SZE via dhruba)
+
NEW FEATURES
HADOOP-1636. Allow configuration of the number of jobs kept in
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Jul 31 16:57:48
2007
@@ -31,9 +31,9 @@
/**
* Compared to the previous version the following changes have been introduced:
- * 14: distributedUpgradeProgress() added.
+ * 15: create(...) should only create a file but not return block.
*/
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
///////////////////////////////////////
// File contents
@@ -90,7 +90,7 @@
* create multi-block files must also use reportWrittenBlock()
* and addBlock().
*/
- public LocatedBlock create(String src,
+ public void create(String src,
String clientName,
boolean overwrite,
short replication,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Jul 31 16:57:48
2007
@@ -1396,9 +1396,7 @@
boolean closed = false;
private UTF8 src;
- private boolean overwrite;
private short replication;
- private boolean firstTime = true;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private File backupFile;
@@ -1421,7 +1419,6 @@
) throws IOException {
super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
this.src = src;
- this.overwrite = overwrite;
this.replication = replication;
this.blockSize = blockSize;
this.buffersize = buffersize;
@@ -1441,6 +1438,8 @@
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
+ namenode.create(
+ src.toString(), clientName, overwrite, replication, blockSize);
}
private void openBackupStream() throws IOException {
@@ -1494,13 +1493,7 @@
do {
retry = false;
- LocatedBlock lb;
- if (firstTime) {
- lb = locateNewBlock();
- } else {
- lb = locateFollowingBlock(startTime);
- }
-
+ LocatedBlock lb = locateFollowingBlock(startTime);
block = lb.getBlock();
if (block.getNumBytes() < bytesWrittenToBlock) {
block.setNumBytes(bytesWrittenToBlock);
@@ -1524,12 +1517,7 @@
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
- if (firstTime) {
- namenode.abandonFileInProgress(src.toString(),
- clientName);
- } else {
- namenode.abandonBlock(block, src.toString());
- }
+ namenode.abandonBlock(block, src.toString());
retry = true;
continue;
}
@@ -1549,14 +1537,8 @@
blockStream = out;
blockReplyStream = new DataInputStream(s.getInputStream());
} while (retry);
- firstTime = false;
}
- private LocatedBlock locateNewBlock() throws IOException {
- return namenode.create(src.toString(), clientName,
- overwrite, replication, blockSize);
- }
-
private LocatedBlock locateFollowingBlock(long start
) throws IOException {
int retries = 5;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jul 31 16:57:48
2007
@@ -709,54 +709,7 @@
* @throws IOException if the filename is invalid
* {@link FSDirectory#isValidToCreate(String)}.
*/
- public LocatedBlock startFile(String src,
- String holder,
- String clientMachine,
- boolean overwrite,
- short replication,
- long blockSize
- ) throws IOException {
-
- //
- // Create file into pendingCreates and get the first blockId
- //
- Block newBlock = startFileInternal(src, holder, clientMachine,
- overwrite, replication,
- blockSize);
-
- //
- // Get the array of replication targets
- //
- try {
- DatanodeDescriptor clientNode =
- host2DataNodeMap.getDatanodeByHost(clientMachine);
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode, null, blockSize);
- if (targets.length < this.minReplication) {
- if (clusterMap.getNumOfLeaves() == 0) {
- throw new IOException("Failed to create file " + src
- + " on client " + clientMachine
- + " because this cluster has no datanodes.");
- }
- throw new IOException("Failed to create file " + src
- + " on client " + clientMachine
- + " because there were not enough datanodes available. "
- + "Found " + targets.length
- + " datanodes but MIN_REPLICATION for the cluster is "
- + "configured to be "
- + this.minReplication
- + ".");
- }
- return new LocatedBlock(newBlock, targets, 0L);
-
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
- + ie.getMessage());
- throw ie;
- }
- }
-
- public synchronized Block startFileInternal(String src,
+ synchronized void startFile(String src,
String holder,
String clientMachine,
boolean overwrite,
@@ -861,9 +814,6 @@
}
lease.startedCreate(src);
}
-
- // Create first block
- return allocateBlock(src);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jul 31 16:57:48 2007
@@ -291,7 +291,7 @@
/**
*/
- public LocatedBlock create(String src,
+ public void create(String src,
String clientName,
boolean overwrite,
short replication,
@@ -304,14 +304,9 @@
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
- LocatedBlock result = namesystem.startFile(src,
- clientName,
- clientMachine,
- overwrite,
- replication,
- blockSize);
+ namesystem.startFile(
+ src, clientName, clientMachine, overwrite, replication, blockSize);
myMetrics.createFile();
- return result;
}
public boolean setReplication(String src,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Jul 31 16:57:48
2007
@@ -249,7 +249,15 @@
* Files are overwritten by default.
*/
public FSDataOutputStream create(Path f) throws IOException {
- return create(f, true,
+ return create(f, true);
+ }
+
+ /**
+ * Opens an FSDataOutputStream at the indicated Path.
+ */
+ public FSDataOutputStream create(Path f, boolean overwrite)
+ throws IOException {
+ return create(f, overwrite,
getConf().getInt("io.file.buffer.size", 4096),
getDefaultReplication(),
getDefaultBlockSize());
@@ -773,7 +781,19 @@
*/
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
- FileUtil.copy(getLocal(getConf()), src, this, dst, delSrc, getConf());
+ copyFromLocalFile(delSrc, true, src, dst);
+ }
+
+ /**
+ * The src file is on the local disk. Add it to FS at
+ * the given dst name.
+ * delSrc indicates if the source should be removed
+ */
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path src, Path dst)
+ throws IOException {
+ Configuration conf = getConf();
+ FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Jul 31 16:57:48 2007
@@ -114,6 +114,15 @@
FileSystem dstFS, Path dst,
boolean deleteSource,
Configuration conf) throws IOException {
+ return copy(srcFS, src, dstFS, dst, deleteSource, true, conf);
+ }
+
+ /** Copy files between FileSystems. */
+ public static boolean copy(FileSystem srcFS, Path src,
+ FileSystem dstFS, Path dst,
+ boolean deleteSource,
+ boolean overwrite,
+ Configuration conf) throws IOException {
dst = checkDest(src.getName(), dstFS, dst);
if (srcFS.isDirectory(src)) {
@@ -124,12 +133,12 @@
Path contents[] = srcFS.listPaths(src);
for (int i = 0; i < contents.length; i++) {
copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()),
- deleteSource, conf);
+ deleteSource, overwrite, conf);
}
} else if (srcFS.isFile(src)) {
InputStream in = srcFS.open(src);
try {
- OutputStream out = dstFS.create(dst);
+ OutputStream out = dstFS.create(dst, overwrite);
copyContent(in, out, conf);
} finally {
in.close();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Tue Jul 31 16:57:48 2007
@@ -109,7 +109,7 @@
if (src.toString().equals("-")) {
copyFromStdin(new Path(dstf));
} else {
- fs.copyFromLocalFile(src, new Path(dstf));
+ fs.copyFromLocalFile(false, false, src, new Path(dstf));
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=561603&r1=561602&r2=561603
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Tue Jul 31 16:57:48
2007
@@ -19,13 +19,13 @@
import junit.framework.TestCase;
import java.io.*;
+import java.security.*;
+import java.util.*;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.StringUtils;
-
/**
* This class tests commands from DFSShell.
*/
@@ -33,11 +33,141 @@
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
+
+ static private Path writeFile(FileSystem fs, Path f) throws IOException {
+ DataOutputStream out = fs.create(f);
+ out.writeBytes("dhruba: " + f);
+ out.close();
+ assertTrue(fs.exists(f));
+ return f;
+ }
+
+ static private Path mkdir(FileSystem fs, Path p) throws IOException {
+ assertTrue(fs.mkdirs(p));
+ assertTrue(fs.exists(p));
+ assertTrue(fs.getFileStatus(p).isDir());
+ return p;
+ }
+
+ static private File createLocalFile(File f) throws IOException {
+ assertTrue(!f.exists());
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(f)));
+ out.println(f.getAbsolutePath());
+ out.close();
+ assertTrue(f.exists());
+ assertTrue(f.isFile());
+ return f;
+ }
+
+ static void show(String s) {
+ System.out.println(Thread.currentThread().getStackTrace()[2] + " " + s);
+ }
+
+ public void testZeroSizeFile() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ assertTrue("Not a HDFS: "+fs.getUri(),
+ fs instanceof DistributedFileSystem);
+ final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+ try {
+ //create a zero size file
+ final File f1 = new File(TEST_ROOT_DIR, "f1");
+ assertTrue(!f1.exists());
+ assertTrue(f1.createNewFile());
+ assertTrue(f1.exists());
+ assertTrue(f1.isFile());
+ assertEquals(0L, f1.length());
+
+ //copy to remote
+ final Path root = mkdir(dfs, new Path("/test/zeroSizeFile"));
+ final Path remotef = new Path(root, "dst");
+ show("copy local " + f1 + " to remote " + remotef);
+ dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), remotef);
+
+ //getBlockSize() should not throw exception
+ show("Block size = " + dfs.getFileStatus(remotef).getBlockSize());
+
+ //copy back
+ final File f2 = new File(TEST_ROOT_DIR, "f2");
+ assertTrue(!f2.exists());
+ dfs.copyToLocalFile(remotef, new Path(f2.getPath()));
+ assertTrue(f2.exists());
+ assertTrue(f2.isFile());
+ assertEquals(0L, f2.length());
- private void writeFile(FileSystem fileSys, Path name) throws IOException {
- DataOutputStream stm = fileSys.create(name);
- stm.writeBytes("dhruba: " + name);
- stm.close();
+ f1.delete();
+ f2.delete();
+ } finally {
+ try {dfs.close();} catch (Exception e) {}
+ cluster.shutdown();
+ }
+ }
+
+ public void testPut() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ assertTrue("Not a HDFS: "+fs.getUri(),
+ fs instanceof DistributedFileSystem);
+ final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+ try {
+ final File f1 = createLocalFile(new File(TEST_ROOT_DIR, "f1"));
+ final File f2 = createLocalFile(new File(TEST_ROOT_DIR, "f2"));
+
+ final Path root = mkdir(dfs, new Path("/test/put"));
+ final Path dst = new Path(root, "dst");
+
+ show("begin");
+
+ final Thread copy2ndFileThread = new Thread() {
+ public void run() {
+ try {
+ show("copy local " + f2 + " to remote " + dst);
+ dfs.copyFromLocalFile(false, false, new Path(f2.getPath()), dst);
+ } catch (IOException ioe) {
+ show("good " + StringUtils.stringifyException(ioe));
+ return;
+ }
+ //should not be here, must got IOException
+ assertTrue(false);
+ }
+ };
+
+ //use SecurityManager to pause the copying of f1 and begin copying f2
+ System.setSecurityManager(new SecurityManager() {
+ private boolean firstTime = true;
+
+ public void checkPermission(Permission perm) {
+ if (firstTime) {
+ Thread t = Thread.currentThread();
+ if (!t.toString().contains("DataNode")) {
+ String s = "" + Arrays.asList(t.getStackTrace());
+ if (s.contains("FileUtil.copyContent")) {
+ //pause at FileUtil.copyContent
+
+ firstTime = false;
+ copy2ndFileThread.start();
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
+ }
+ }
+ }
+ }
+ });
+ show("copy local " + f1 + " to remote " + dst);
+ dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), dst);
+ show("done");
+
+ try {copy2ndFileThread.join();} catch (InterruptedException e) { }
+ System.setSecurityManager(null);
+ f1.delete();
+ f2.delete();
+ } finally {
+ try {dfs.close();} catch (Exception e) {}
+ cluster.shutdown();
+ }
}
public void testCopyToLocal() throws IOException {
@@ -63,33 +193,14 @@
// + sub
// |- f3
// |- f4
- Path root = new Path("/test/copyToLocal");
- assertTrue(dfs.mkdirs(root));
- assertTrue(dfs.exists(root));
- assertTrue(dfs.isDirectory(root));
-
- Path sub = new Path(root, "sub");
- assertTrue(dfs.mkdirs(sub));
- assertTrue(dfs.exists(sub));
- assertTrue(dfs.isDirectory(sub));
-
- Path f1 = new Path(root, "f1");
- writeFile(dfs, f1);
- assertTrue(dfs.exists(f1));
-
- Path f2 = new Path(root, "f2");
- writeFile(dfs, f2);
- assertTrue(dfs.exists(f2));
-
- Path f3 = new Path(sub, "f3");
- writeFile(dfs, f3);
- assertTrue(dfs.exists(f3));
-
- Path f4 = new Path(sub, "f4");
- writeFile(dfs, f4);
- assertTrue(dfs.exists(f4));
- }
+ Path root = mkdir(dfs, new Path("/test/copyToLocal"));
+ Path sub = mkdir(dfs, new Path(root, "sub"));
+ writeFile(fs, new Path(root, "f1"));
+ writeFile(fs, new Path(root, "f2"));
+ writeFile(fs, new Path(sub, "f3"));
+ writeFile(fs, new Path(sub, "f4"));
+ }
// Verify copying the tree
{
@@ -111,10 +222,10 @@
assertTrue("Copying failed.", sub.isDirectory());
File f3 = new File(sub, "f3");
- assertTrue("Copying failed.", f3.exists());
+ assertTrue("Copying failed.", f3.isFile());
File f4 = new File(sub, "f4");
- assertTrue("Copying failed.", f4.exists());
+ assertTrue("Copying failed.", f4.isFile());
f1.delete();
f2.delete();
|