hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r803969 - in /hadoop/hdfs/branches/HDFS-265: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
Date Thu, 13 Aug 2009 18:05:02 GMT
Author: hairong
Date: Thu Aug 13 18:05:02 2009
New Revision: 803969

URL: http://svn.apache.org/viewvc?rev=803969&view=rev
Log:
HDFS-536. Support hflush at DFSClient. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=803969&r1=803968&r2=803969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Aug 13 18:05:02 2009
@@ -13,6 +13,8 @@
 
     HDFS-461. Tool to analyze file size distribution in HDFS. (shv)
 
+    HDFS-536. Support hflush at DFSClient. (hairong)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=803969&r1=803968&r2=803969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Aug 13
18:05:02 2009
@@ -2414,11 +2414,29 @@
 
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
+            blockStream.flush();
 
             if (one.lastPacketInBlock) {
-              blockStream.writeInt(0); // indicate end-of-block 
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && ackQueue.size() !=
0 && clientRunning) {
+                  try {
+                    dataQueue.wait(1000);   // wait for acks to arrive from datanodes
+                  } catch (InterruptedException  e) {
+                  }
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+
+              // done receiving all acks
+              if (response != null) {
+                response.close(); // notify responder to close
+              }
+              // indicate end-of-block
+              blockStream.writeInt(0);
+              blockStream.flush();
             }
-            blockStream.flush();
             if (LOG.isDebugEnabled()) {
               LOG.debug("DataStreamer block " + block +
                   " wrote packet seqno:" + one.seqno +
@@ -2442,18 +2460,6 @@
 
           // Is this block full?
           if (one.lastPacketInBlock) {
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && ackQueue.size() != 0
&& clientRunning) {
-                try {
-                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
-                } catch (InterruptedException  e) {
-                }
-              }
-            }
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
-            }
-
             LOG.debug("Closing old block " + block);
             this.setName("DataStreamer for file " + src);
             closeResponder();
@@ -2473,7 +2479,7 @@
       }
 
       private void closeInternal() {
-        closeResponder();
+        closeResponder();       // close and join
         closeStream();
         streamerClosed = true;
         closed = true;
@@ -3189,11 +3195,23 @@
     }
   
     /**
-     * All data is written out to datanodes. It is not guaranteed 
-     * that data has been flushed to persistent store on the 
-     * datanode. Block allocations are persisted on namenode.
+     * @deprecated As of HDFS 0.21.0, replaced by hflush
+     * @see #hflush()
      */
+    @Deprecated
     public synchronized void sync() throws IOException {
+      hflush();
+    }
+    
+    /**
+     * All data is flushed out to datanodes.
+     * It is a synchronous operation. When it returns,
+     * it gurantees that flushed data become visible to new readers. 
+     * It is not guaranteed that data has been flushed to 
+     * persistent store on the datanode. 
+     * Block allocations are persisted on namenode.
+     */
+    public synchronized void hflush() throws IOException {
       checkOpen();
       isClosed();
       try {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java?rev=803969&r1=803968&r2=803969&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
(original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
Thu Aug 13 18:05:02 2009
@@ -49,7 +49,7 @@
       out.write(i);
     }
     // flush to make sure a block is allocated.
-    ((DFSOutputStream)(out.getWrappedStream())).sync();
+    ((DFSOutputStream)(out.getWrappedStream())).hflush();
     
     ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
     cluster.getNamesystem().DFSNodesStatus(dnList, dnList);



Mime
View raw message