hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1346050 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/server/...
Date Mon, 04 Jun 2012 17:43:51 GMT
Author: szetszwo
Date: Mon Jun  4 17:43:51 2012
New Revision: 1346050

URL: http://svn.apache.org/viewvc?rev=1346050&view=rev
Log:
svn merge -c 1344419 from trunk for HDFS-744. Support hsync in HDFS.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
      - copied unchanged from r1344419, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1344419

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun  4
17:43:51 2012
@@ -6,6 +6,8 @@ Release 2.0.1-alpha - UNRELEASED
 
   NEW FEATURES
 
+    HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1344419

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Mon Jun  4 17:43:51 2012
@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSO
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
   private final short blockReplication; // replication factor of file
+  private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
     long    offsetInBlock;       // offset in block
-    boolean lastPacketInBlock;   // is this the last packet in block?
+    private boolean lastPacketInBlock;   // is this the last packet in block?
+    boolean syncBlock;          // this packet forces the current block to disk
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
 
@@ -245,7 +247,7 @@ public class DFSOutputStream extends FSO
       buffer.mark();
 
       PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       header.putInBuffer(buffer);
       
       buffer.reset();
@@ -1249,6 +1251,7 @@ public class DFSOutputStream extends FSO
       long blockSize, Progressable progress, int buffersize,
       DataChecksum checksum) throws IOException {
     this(dfsClient, src, blockSize, progress, checksum, replication);
+    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
     computePacketChunkSize(dfsClient.getConf().writePacketSize,
         checksum.getBytesPerChecksum());
@@ -1431,6 +1434,7 @@ public class DFSOutputStream extends FSO
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
         lastFlushOffset = 0;
@@ -1456,6 +1460,24 @@ public class DFSOutputStream extends FSO
    */
   @Override
   public void hflush() throws IOException {
+    flushOrSync(false);
+  }
+
+  /**
+   * The expected semantics is all data have flushed out to all replicas 
+   * and all replicas have done posix fsync equivalent - ie the OS has 
+   * flushed it to the disk device (but the disk may have it in its cache).
+   * 
+   * Note that only the current block is flushed to the disk device.
+   * To guarantee durable sync across block boundaries the stream should
+   * be created with {@link CreateFlag#SYNC_BLOCK}.
+   */
+  @Override
+  public void hsync() throws IOException {
+    flushOrSync(true);
+  }
+
+  private void flushOrSync(boolean isSync) throws IOException {
     dfsClient.checkOpen();
     isClosed();
     try {
@@ -1483,7 +1505,13 @@ public class DFSOutputStream extends FSO
           assert bytesCurBlock > lastFlushOffset;
           // record the valid offset of this flush
           lastFlushOffset = bytesCurBlock;
-          waitAndQueueCurrentPacket();
+          if (isSync && currentPacket == null) {
+            // Nothing to send right now,
+            // but sync was requested.
+            // Send an empty packet
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          }
         } else {
           // We already flushed up to this offset.
           // This means that we haven't written anything since the last flush
@@ -1493,8 +1521,21 @@ public class DFSOutputStream extends FSO
           assert oldCurrentPacket == null :
             "Empty flush should not occur with a currentPacket";
 
-          // just discard the current packet since it is already been sent.
-          currentPacket = null;
+          if (isSync && bytesCurBlock > 0) {
+            // Nothing to send right now,
+            // and the block was partially written,
+            // and sync was requested.
+            // So send an empty sync packet.
+            currentPacket = new Packet(packetSize, chunksPerPacket,
+                bytesCurBlock);
+          } else {
+            // just discard the current packet since it is already been sent.
+            currentPacket = null;
+          }
+        }
+        if (currentPacket != null) {
+          currentPacket.syncBlock = isSync;
+          waitAndQueueCurrentPacket();          
         }
         // Restore state of stream. Record the last flush offset 
         // of the last full chunk that was flushed.
@@ -1546,18 +1587,6 @@ public class DFSOutputStream extends FSO
   }
 
   /**
-   * The expected semantics is all data have flushed out to all replicas 
-   * and all replicas have done posix fsync equivalent - ie the OS has 
-   * flushed it to the disk device (but the disk may have it in its cache).
-   * 
-   * Right now by default it is implemented as hflush
-   */
-  @Override
-  public synchronized void hsync() throws IOException {
-    hflush();
-  }
-
-  /**
    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
    */
   @Deprecated
@@ -1681,6 +1710,7 @@ public class DFSOutputStream extends FSO
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
+        currentPacket.syncBlock = shouldSyncBlock;
       }
 
       flushInternal();             // flush all data to Datanodes

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
Mon Jun  4 17:43:51 2012
@@ -223,12 +223,19 @@ public class DistributedFileSystem exten
 
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,
-    boolean overwrite, int bufferSize, short replication, long blockSize,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return create(f, permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+        blockSize, progress);
+  }
+  
+  @Override
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
-    final EnumSet<CreateFlag> cflags = overwrite?
-        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-        : EnumSet.of(CreateFlag.CREATE);
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
         replication, blockSize, progress, bufferSize);
     return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ public class DistributedFileSystem exten
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
    */
+  @Override
   public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
Mon Jun  4 17:43:51 2012
@@ -40,6 +40,7 @@ public class PacketHeader {
       .setSeqno(0)
       .setLastPacketInBlock(false)
       .setDataLen(0)
+      .setSyncBlock(false)
       .build().getSerializedSize();
   public static final int PKT_HEADER_LEN =
     6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public class PacketHeader {
   }
 
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
-                      boolean lastPacketInBlock, int dataLen) {
+                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
     proto = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
       .setDataLen(dataLen)
+      .setSyncBlock(syncBlock)
       .build();
   }
 
@@ -81,6 +83,10 @@ public class PacketHeader {
     return packetLen;
   }
 
+  public boolean getSyncBlock() {
+    return proto.getSyncBlock();
+  }
+
   @Override
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Mon Jun  4 17:43:51 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
 
+  private boolean syncOnClose;
+
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable
    * close files.
    */
   public void close() throws IOException {
-
     IOException ioe = null;
+    if (syncOnClose && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     // close checksum file
     try {
       if (checksumOut != null) {
         checksumOut.flush();
-        if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream))
{
+        if (syncOnClose && (cout instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)cout).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         checksumOut.close();
         checksumOut = null;
@@ -267,8 +274,10 @@ class BlockReceiver implements Closeable
     try {
       if (out != null) {
         out.flush();
-        if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream))
{
+        if (syncOnClose && (out instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)out).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         out.close();
         out = null;
@@ -290,12 +299,25 @@ class BlockReceiver implements Closeable
    * Flush block data and metadata files to disk.
    * @throws IOException
    */
-  void flush() throws IOException {
+  void flushOrSync(boolean isSync) throws IOException {
+    if (isSync && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     if (checksumOut != null) {
       checksumOut.flush();
+      if (isSync && (cout instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)cout).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
     if (out != null) {
       out.flush();
+      if (isSync && (out instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)out).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
   }
 
@@ -533,7 +555,9 @@ class BlockReceiver implements Closeable
       header.getOffsetInBlock(),
       header.getSeqno(),
       header.isLastPacketInBlock(),
-      header.getDataLen(), endOfHeader);
+      header.getDataLen(),
+      header.getSyncBlock(),
+      endOfHeader);
   }
 
   /**
@@ -549,15 +573,19 @@ class BlockReceiver implements Closeable
    * returns the number of data bytes that the packet has.
    */
   private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+      boolean lastPacketInBlock, int len, boolean syncBlock,
+      int endOfHeader) throws IOException {
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
                 " of length " + len +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
+                " syncBlock " + syncBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
     }
-    
+    // make sure the block gets sync'ed upon close
+    this.syncOnClose |= syncBlock && lastPacketInBlock;
+
     // update received bytes
     long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
@@ -587,6 +615,10 @@ class BlockReceiver implements Closeable
       if(LOG.isDebugEnabled()) {
         LOG.debug("Receiving an empty packet or the end of the block " + block);
       }
+      // flush unless close() would flush anyway
+      if (syncBlock && !lastPacketInBlock) {
+        flushOrSync(true);
+      }
     } else {
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
@@ -677,8 +709,8 @@ class BlockReceiver implements Closeable
             );
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
-          /// flush entire packet
-          flush();
+          /// flush entire packet, sync unless close() will sync
+          flushOrSync(syncBlock && !lastPacketInBlock);
           
           replicaInfo.setLastChecksumAndDataLen(
             offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ class BlockReceiver implements Closeable
       String mirrAddr, DataTransferThrottler throttlerArg,
       DatanodeInfo[] downstreams) throws IOException {
 
+      syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
       mirrorOut = mirrOut;
       mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ class BlockReceiver implements Closeable
           datanode.data.convertTemporaryToRbw(block);
         } else {
           // for isDatnode or TRANSFER_FINALIZED
-          // Finalize the block. Does this fsync()?
+          // Finalize the block.
           datanode.data.finalizeBlock(block);
         }
         datanode.metrics.incrBlocksWritten();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
Mon Jun  4 17:43:51 2012
@@ -701,8 +701,9 @@ class BlockSender implements java.io.Clo
    */
   private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
+    // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
-        (dataLen == 0), dataLen);
+        (dataLen == 0), dataLen, false);
     header.putInBuffer(pkt);
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
Mon Jun  4 17:43:51 2012
@@ -61,6 +61,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong writesFromLocalClient;
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
+
+  @Metric MutableCounterLong fsyncCount;
   
   @Metric MutableCounterLong volumeFailures;
 
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
 
+  @Metric MutableRate fsync;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
@@ -151,6 +155,14 @@ public class DataNodeMetrics {
     blocksRead.incr();
   }
 
+  public void incrFsyncCount() {
+    fsyncCount.incr();
+  }
+
+  public void addFsync(long latency) {
+    fsync.add(latency);
+  }
+
   public void shutdown() {
     DefaultMetricsSystem.shutdown();
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
Mon Jun  4 17:43:51 2012
@@ -113,6 +113,7 @@ message PacketHeaderProto {
   required sfixed64 seqno = 2;
   required bool lastPacketInBlock = 3;
   required sfixed32 dataLen = 4;
+  optional bool syncBlock = 5 [default = false];
 }
 
 enum Status {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
Mon Jun  4 17:43:51 2012
@@ -139,7 +139,7 @@ public class AppendTestUtil {
   /**
    *  create a buffer that contains the entire test file data.
    */
-  static byte[] initBuffer(int size) {
+  public static byte[] initBuffer(int size) {
     if (seed == -1)
       seed = nextLong();
     return randomBytes(seed, size);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Mon Jun  4 17:43:51 2012
@@ -159,7 +159,8 @@ public class TestDataTransferProtocol ex
       block.getNumBytes(), // OffsetInBlock
       100,                 // sequencenumber
       true,                // lastPacketInBlock
-      0);                  // chunk length
+      0,                   // chunk length
+      false);               // sync block
     hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
 
@@ -402,7 +403,8 @@ public class TestDataTransferProtocol ex
       0,     // offset in block,
       100,   // seqno
       false, // last packet
-      -1 - random.nextInt(oneMil)); // bad datalen
+      -1 - random.nextInt(oneMil), // bad datalen
+      false);
     hdr.write(sendOut);
 
     sendResponse(Status.SUCCESS, "", null, recvOut);
@@ -424,7 +426,8 @@ public class TestDataTransferProtocol ex
       0,     // OffsetInBlock
       100,   // sequencenumber
       true,  // lastPacketInBlock
-      0);    // chunk length
+      0,     // chunk length
+      false);    
     hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
     sendOut.flush();
@@ -508,8 +511,8 @@ public class TestDataTransferProtocol ex
       1024,                // OffsetInBlock
       100,                 // sequencenumber
       false,               // lastPacketInBlock
-      4096);               // chunk length
-
+      4096,                // chunk length
+      false);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     hdr.write(new DataOutputStream(baos));
 



Mime
View raw message