hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r1035386 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Mon, 15 Nov 2010 18:23:43 GMT
Author: hairong
Date: Mon Nov 15 18:23:42 2010
New Revision: 1035386

URL: http://svn.apache.org/viewvc?rev=1035386&view=rev
Log:
HDFS-895. Allow hflush/sync to occur in parallel with new writes to the file. Contributed
by Todd Lipcon.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1035386&r1=1035385&r2=1035386&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Nov 15 18:23:42 2010
@@ -47,6 +47,9 @@ Trunk (unreleased changes)
     HDFS-811. Add metrics, failure reporting and additional tests for HDFS-457.
     (eli)
 
+    HDFS-895. Allow hflush/sync to occur in parallel with new writes
+    to the file. (Todd Lipcon via hairong)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1035386&r1=1035385&r2=1035386&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Mon Nov 15 18:23:42
2010
@@ -115,12 +115,14 @@ class DFSOutputStream extends FSOutputSu
   private Packet currentPacket = null;
   private DataStreamer streamer;
   private long currentSeqno = 0;
+  private long lastQueuedSeqno = -1;
+  private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes writen in current block
   private int packetSize = 0; // write packet size, including the header.
   private int chunksPerPacket = 0;
   private volatile IOException lastException = null;
   private long artificialSlowdown = 0;
-  private long lastFlushOffset = -1; // offset when flush was invoked
+  private long lastFlushOffset = 0; // offset when flush was invoked
   //persist blocks on namenode
   private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
   private volatile boolean appendChunk = false;   // appending to existing partial block
@@ -434,6 +436,7 @@ class DFSOutputStream extends FSOutputSu
               one = dataQueue.getFirst(); // regular data packet
             }
           }
+          assert one != null;
 
           // get new block from namenode.
           if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
@@ -670,6 +673,7 @@ class DFSOutputStream extends FSOutputSu
             block.setNumBytes(one.getLastByteOffsetBlock());
 
             synchronized (dataQueue) {
+              lastAckedSeqno = seqno;
               ackQueue.removeFirst();
               dataQueue.notifyAll();
             }
@@ -720,8 +724,21 @@ class DFSOutputStream extends FSOutputSu
       
       if (!streamerClosed && dfsClient.clientRunning) {
         if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+
+          // If we had an error while closing the pipeline, we go through a fast-path
+          // where the BlockReceiver does not run. Instead, the DataNode just finalizes
+          // the block immediately during the 'connect ack' process. So, we want to pull
+          // the end-of-block packet from the dataQueue, since we don't actually have
+          // a true pipeline to send it over.
+          //
+          // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
+          // a client waiting on close() will be aware that the flush finished.
           synchronized (dataQueue) {
-            dataQueue.remove();  // remove the end of block packet
+            assert dataQueue.size() == 1;
+            Packet endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
+            assert endOfBlockPacket.lastPacketInBlock;
+            assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
+            lastAckedSeqno = endOfBlockPacket.seqno;
             dataQueue.notifyAll();
           }
           endBlock();
@@ -1131,14 +1148,20 @@ class DFSOutputStream extends FSOutputSu
     }
   }
 
-  private void queuePacket(Packet packet) {
+  private void queueCurrentPacket() {
     synchronized (dataQueue) {
-      dataQueue.addLast(packet);
+      if (currentPacket == null) return;
+      dataQueue.addLast(currentPacket);
+      lastQueuedSeqno = currentPacket.seqno;
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
+      }
+      currentPacket = null;
       dataQueue.notifyAll();
     }
   }
 
-  private void waitAndQueuePacket(Packet packet) throws IOException {
+  private void waitAndQueueCurrentPacket() throws IOException {
     synchronized (dataQueue) {
       // If queue is full, then wait till we have enough space
       while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
@@ -1148,7 +1171,7 @@ class DFSOutputStream extends FSOutputSu
         }
       }
       isClosed();
-      queuePacket(packet);
+      queueCurrentPacket();
     }
   }
 
@@ -1202,8 +1225,7 @@ class DFSOutputStream extends FSOutputSu
             ", blockSize=" + blockSize +
             ", appendChunk=" + appendChunk);
       }
-      waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      waitAndQueueCurrentPacket();
 
       // If the reopened file did not end at chunk boundary and the above
       // write filled up its partial chunk. Tell the summer to generate full 
@@ -1225,10 +1247,9 @@ class DFSOutputStream extends FSOutputSu
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
-        waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+        waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
-        lastFlushOffset = -1;
+        lastFlushOffset = 0;
       }
     }
   }
@@ -1245,60 +1266,88 @@ class DFSOutputStream extends FSOutputSu
    * but not neccessary on the DN's OS buffers. 
    *
    * It is a synchronous operation. When it returns,
-   * it gurantees that flushed data become visible to new readers. 
+   * it guarantees that flushed data become visible to new readers. 
    * It is not guaranteed that data has been flushed to 
    * persistent store on the datanode. 
    * Block allocations are persisted on namenode.
    */
   @Override
-  public synchronized void hflush() throws IOException {
+  public void hflush() throws IOException {
     dfsClient.checkOpen();
     isClosed();
     try {
-      /* Record current blockOffset. This might be changed inside
-       * flushBuffer() where a partial checksum chunk might be flushed.
-       * After the flush, reset the bytesCurBlock back to its previous value,
-       * any partial checksum chunk will be sent now and in next packet.
-       */
-      long saveOffset = bytesCurBlock;
-
-      // flush checksum buffer, but keep checksum buffer intact
-      flushBuffer(true);
-
-      if(DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
+      long toWaitFor;
+      synchronized (this) {
+        /* Record current blockOffset. This might be changed inside
+         * flushBuffer() where a partial checksum chunk might be flushed.
+         * After the flush, reset the bytesCurBlock back to its previous value,
+         * any partial checksum chunk will be sent now and in next packet.
+         */
+        long saveOffset = bytesCurBlock;
+        Packet oldCurrentPacket = currentPacket;
+        // flush checksum buffer, but keep checksum buffer intact
+        flushBuffer(true);
+        // bytesCurBlock potentially incremented if there was buffered data
+
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug(
+            "DFSClient flush() : saveOffset " + saveOffset +  
             " bytesCurBlock " + bytesCurBlock +
             " lastFlushOffset " + lastFlushOffset);
-      }
-      
-      // Flush only if we haven't already flushed till this offset.
-      if (lastFlushOffset != bytesCurBlock) {
+        }
+        // Flush only if we haven't already flushed till this offset.
+        if (lastFlushOffset != bytesCurBlock) {
+          assert bytesCurBlock > lastFlushOffset;
+          // record the valid offset of this flush
+          lastFlushOffset = bytesCurBlock;
+          waitAndQueueCurrentPacket();
+        } else {
+          // We already flushed up to this offset.
+          // This means that we haven't written anything since the last flush
+          // (or the beginning of the file). Hence, we should not have any
+          // packet queued prior to this call, since the last flush set
+          // currentPacket = null.
+          assert oldCurrentPacket == null :
+            "Empty flush should not occur with a currentPacket";
 
-        // record the valid offset of this flush
-        lastFlushOffset = bytesCurBlock;
+          // just discard the current packet since it is already been sent.
+          currentPacket = null;
+        }
+        // Restore state of stream. Record the last flush offset 
+        // of the last full chunk that was flushed.
+        //
+        bytesCurBlock = saveOffset;
+        toWaitFor = lastQueuedSeqno;
+      } // end synchronized
 
-        // wait for all packets to be sent and acknowledged
-        flushInternal();
-      } else {
-        // just discard the current packet since it is already been sent.
-        currentPacket = null;
-      }
-      
-      // Restore state of stream. Record the last flush offset 
-      // of the last full chunk that was flushed.
-      //
-      bytesCurBlock = saveOffset;
+      waitForAckedSeqno(toWaitFor);
 
       // If any new blocks were allocated since the last flush, 
       // then persist block locations on namenode. 
       //
       if (persistBlocks.getAndSet(false)) {
-        dfsClient.namenode.fsync(src, dfsClient.clientName);
+        try {
+          dfsClient.namenode.fsync(src, dfsClient.clientName);
+        } catch (IOException ioe) {
+          DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+          // If we got an error here, it might be because some other thread called
+          // close before our hflush completed. In that case, we should throw an
+          // exception that the stream is closed.
+          isClosed();
+          // If we aren't closed but failed to sync, we should expose that to the
+          // caller.
+          throw ioe;
+        }
       }
     } catch (IOException e) {
-        lastException = new IOException("IOException flush:" + e);
-        closeThreads(true);
-        throw e;
+      DFSClient.LOG.warn("Error while syncing", e);
+      synchronized (this) {
+        if (!closed) {
+          lastException = new IOException("IOException flush:" + e);
+          closeThreads(true);
+        }
+      }
+      throw e;
     }
   }
 
@@ -1339,26 +1388,39 @@ class DFSOutputStream extends FSOutputSu
    * Waits till all existing data is flushed and confirmations 
    * received from datanodes. 
    */
-  private synchronized void flushInternal() throws IOException {
-    dfsClient.checkOpen();
-    isClosed();
-    //
-    // If there is data in the current buffer, send it across
-    //
-    if (currentPacket != null) {
-      queuePacket(currentPacket);
-      currentPacket = null;
+  private void flushInternal() throws IOException {
+    long toWaitFor;
+    synchronized (this) {
+      dfsClient.checkOpen();
+      isClosed();
+      //
+      // If there is data in the current buffer, send it across
+      //
+      queueCurrentPacket();
+      toWaitFor = lastQueuedSeqno;
     }
 
+    waitForAckedSeqno(toWaitFor);
+  }
+
+  private void waitForAckedSeqno(long seqno) throws IOException {
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Waiting for ack for: " + seqno);
+    }
     synchronized (dataQueue) {
-      while (!closed && dataQueue.size() + ackQueue.size() > 0) {
+      while (!closed) {
+        isClosed();
+        if (lastAckedSeqno >= seqno) {
+          break;
+        }
         try {
-          dataQueue.wait();
-        } catch (InterruptedException  e) {
+          dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
       }
-      isClosed();
     }
+    isClosed();
   }
 
   /**
@@ -1410,7 +1472,7 @@ class DFSOutputStream extends FSOutputSu
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) { 
-        waitAndQueuePacket(currentPacket);
+        waitAndQueueCurrentPacket();
       }
 
       if (bytesCurBlock != 0) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1035386&r1=1035385&r2=1035386&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Nov
15 18:23:42 2010
@@ -1377,8 +1377,10 @@ public class FSDataset implements FSCons
     FileChannel channel = file.getChannel();
     long oldPos = channel.position();
     long newPos = oldPos - checksumSize;
-    DataNode.LOG.info("Changing meta file offset of block " + b + " from " + 
-        oldPos + " to " + newPos);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Changing meta file offset of block " + b + " from " +
+          oldPos + " to " + newPos);
+    }
     channel.position(newPos);
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1035386&r1=1035385&r2=1035386&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
Mon Nov 15 18:23:42 2010
@@ -335,6 +335,7 @@ public class SecondaryNameNode implement
           public Void run() throws Exception {
             checkpointImage.cTime = sig.cTime;
             checkpointImage.checkpointTime = sig.checkpointTime;
+            checkpointImage.imageDigest = sig.imageDigest;
         
             // get fsimage
             String fileid = "getimage=1";

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=1035386&r1=1035385&r2=1035386&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Mon Nov 15 18:23:42
2010
@@ -86,13 +86,13 @@ privileged public aspect DFSClientAspect
     LOG.info("FI: before pipelineClose:");
   }
 
-  pointcut checkAckQueue(DFSOutputStream.Packet cp):
-    call (void DFSOutputStream.waitAndQueuePacket(
-            DFSOutputStream.Packet))
+  pointcut checkAckQueue(DFSOutputStream stream):
+    call (void DFSOutputStream.waitAndQueueCurrentPacket())
     && withincode (void DFSOutputStream.writeChunk(..))
-    && args(cp);
+    && this(stream);
 
-  after(DFSOutputStream.Packet cp) : checkAckQueue (cp) {
+  after(DFSOutputStream stream) : checkAckQueue (stream) {
+    DFSOutputStream.Packet cp = stream.currentPacket;
     PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
     if (pTest != null && pTest instanceof PipelinesTest) {
       LOG.debug("FI: Recording packet # " + cp.seqno

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1035386&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Mon
Nov 15 18:23:42 2010
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.log4j.Level;
+
+/**
+ * This class tests hflushing concurrently from many threads.
+ */
+public class TestMultiThreadedHflush {
+  static final int blockSize = 1024*1024;
+  static final int numBlocks = 10;
+  static final int fileSize = numBlocks * blockSize + 1;
+
+  private static final int NUM_THREADS = 10;
+  private static final int WRITE_SIZE = 517;
+  private static final int NUM_WRITES_PER_THREAD = 1000;
+  
+  private byte[] toWrite = null;
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size",
4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+  
+  private void initBuffer(int size) {
+    long seed = AppendTestUtil.nextLong();
+    toWrite = AppendTestUtil.randomBytes(seed, size);
+  }
+
+  private class WriterThread extends Thread {
+    private final FSDataOutputStream stm;
+    private final AtomicReference<Throwable> thrown;
+    private final int numWrites;
+    private final CountDownLatch countdown;
+
+    public WriterThread(FSDataOutputStream stm,
+      AtomicReference<Throwable> thrown,
+      CountDownLatch countdown, int numWrites) {
+      this.stm = stm;
+      this.thrown = thrown;
+      this.numWrites = numWrites;
+      this.countdown = countdown;
+    }
+
+    public void run() {
+      try {
+        countdown.await();
+        for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+          doAWrite();
+        }
+      } catch (Throwable t) {
+        thrown.compareAndSet(null, t);
+      }
+    }
+
+    private void doAWrite() throws IOException {
+      stm.write(toWrite);
+      stm.hflush();
+    }
+  }
+
+
+  /**
+   * Test case where a bunch of threads are both appending and flushing.
+   * They all finish before the file is closed.
+   */
+  @Test
+  public void testMultipleHflushers() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/multiple-hflushers.dat");
+    try {
+      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test case where a bunch of threads are continuously calling hflush() while another
+   * thread appends some data and then closes the file.
+   *
+   * The hflushing threads should eventually catch an IOException stating that the stream
+   * was closed -- and not an NPE or anything like that.
+   */
+  @Test
+  public void testHflushWhileClosing() throws Throwable {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/hflush-and-close.dat");
+
+    final FSDataOutputStream stm = createFile(fs, p, 1);
+
+
+    ArrayList<Thread> flushers = new ArrayList<Thread>();
+    final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    try {
+      for (int i = 0; i < 10; i++) {
+        Thread flusher = new Thread() {
+            public void run() {
+              try {
+                while (true) {
+                  try {
+                    stm.hflush();
+                  } catch (IOException ioe) {
+                    if (!ioe.toString().contains("DFSOutputStream is closed")) {
+                      throw ioe;
+                    } else {
+                      return;
+                    }
+                  }
+                }
+              } catch (Throwable t) {
+                thrown.set(t);
+              }
+            }
+          };
+        flusher.start();
+        flushers.add(flusher);
+      }
+
+      // Write some data
+      for (int i = 0; i < 10000; i++) {
+        stm.write(1);
+      }
+
+      // Close it while the flushing threads are still flushing
+      stm.close();
+
+      // Wait for the flushers to all die.
+      for (Thread t : flushers) {
+        t.join();
+      }
+
+      // They should have all gotten the expected exception, not anything
+      // else.
+      if (thrown.get() != null) {
+        throw thrown.get();
+      }
+
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  public void doMultithreadedWrites(
+    Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
+    throws Exception {
+    initBuffer(bufferSize);
+
+    // create a new file.
+    FileSystem fs = p.getFileSystem(conf);
+    FSDataOutputStream stm = createFile(fs, p, 1);
+    System.out.println("Created file simpleFlush.dat");
+
+    // There have been a couple issues with flushing empty buffers, so do
+    // some empty flushes first.
+    stm.hflush();
+    stm.hflush();
+    stm.write(1);
+    stm.hflush();
+    stm.hflush();
+
+    CountDownLatch countdown = new CountDownLatch(1);
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    for (int i = 0; i < numThreads; i++) {
+      Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+      threads.add(t);
+      t.start();
+    }
+
+    // Start all the threads at the same time for maximum raciness!
+    countdown.countDown();
+
+    for (Thread t : threads) {
+      t.join();
+    }
+    if (thrown.get() != null) {
+      throw new RuntimeException("Deferred", thrown.get());
+    }
+    stm.close();
+    System.out.println("Closed file.");
+  }
+
+  public static void main(String args[]) throws Exception {
+    if (args.length != 1) {
+      System.err.println(
+        "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+        " <path to test file> ");
+      System.exit(1);
+    }
+    TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+    Configuration conf = new Configuration();
+    Path p = new Path(args[0]);
+    long st = System.nanoTime();
+    test.doMultithreadedWrites(conf, p, 10, 511, 50000);
+    long et = System.nanoTime();
+
+    System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
+  }
+
+}



Mime
View raw message