Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 91299 invoked from network); 15 Nov 2010 18:24:27 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Nov 2010 18:24:27 -0000 Received: (qmail 93605 invoked by uid 500); 15 Nov 2010 18:24:59 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 93521 invoked by uid 500); 15 Nov 2010 18:24:58 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 93513 invoked by uid 99); 15 Nov 2010 18:24:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Nov 2010 18:24:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Nov 2010 18:24:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6BD8C23888E7; Mon, 15 Nov 2010 18:23:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1035386 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/ Date: Mon, 15 Nov 2010 18:23:43 -0000 To: hdfs-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101115182343.6BD8C23888E7@eris.apache.org> Author: hairong Date: Mon Nov 15 18:23:42 2010 New Revision: 1035386 URL: http://svn.apache.org/viewvc?rev=1035386&view=rev Log: HDFS-895. Allow hflush/sync to occur in parallel with new writes to the file. Contributed by Todd Lipcon. Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1035386&r1=1035385&r2=1035386&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Mon Nov 15 18:23:42 2010 @@ -47,6 +47,9 @@ Trunk (unreleased changes) HDFS-811. Add metrics, failure reporting and additional tests for HDFS-457. (eli) + HDFS-895. Allow hflush/sync to occur in parallel with new writes + to the file. (Todd Lipcon via hairong) + IMPROVEMENTS HDFS-1304. Add a new unit test for HftpFileSystem.open(..). (szetszwo) Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1035386&r1=1035385&r2=1035386&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Mon Nov 15 18:23:42 2010 @@ -115,12 +115,14 @@ class DFSOutputStream extends FSOutputSu private Packet currentPacket = null; private DataStreamer streamer; private long currentSeqno = 0; + private long lastQueuedSeqno = -1; + private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes writen in current block private int packetSize = 0; // write packet size, including the header. private int chunksPerPacket = 0; private volatile IOException lastException = null; private long artificialSlowdown = 0; - private long lastFlushOffset = -1; // offset when flush was invoked + private long lastFlushOffset = 0; // offset when flush was invoked //persist blocks on namenode private final AtomicBoolean persistBlocks = new AtomicBoolean(false); private volatile boolean appendChunk = false; // appending to existing partial block @@ -434,6 +436,7 @@ class DFSOutputStream extends FSOutputSu one = dataQueue.getFirst(); // regular data packet } } + assert one != null; // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { @@ -670,6 +673,7 @@ class DFSOutputStream extends FSOutputSu block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { + lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll(); } @@ -720,8 +724,21 @@ class DFSOutputStream extends FSOutputSu if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { + + // If we had an error while closing the pipeline, we go through a fast-path + // where the BlockReceiver does not run. Instead, the DataNode just finalizes + // the block immediately during the 'connect ack' process. So, we want to pull + // the end-of-block packet from the dataQueue, since we don't actually have + // a true pipeline to send it over. + // + // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that + // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { - dataQueue.remove(); // remove the end of block packet + assert dataQueue.size() == 1; + Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet + assert endOfBlockPacket.lastPacketInBlock; + assert lastAckedSeqno == endOfBlockPacket.seqno - 1; + lastAckedSeqno = endOfBlockPacket.seqno; dataQueue.notifyAll(); } endBlock(); @@ -1131,14 +1148,20 @@ class DFSOutputStream extends FSOutputSu } } - private void queuePacket(Packet packet) { + private void queueCurrentPacket() { synchronized (dataQueue) { - dataQueue.addLast(packet); + if (currentPacket == null) return; + dataQueue.addLast(currentPacket); + lastQueuedSeqno = currentPacket.seqno; + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Queued packet " + currentPacket.seqno); + } + currentPacket = null; dataQueue.notifyAll(); } } - private void waitAndQueuePacket(Packet packet) throws IOException { + private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { // If queue is full, then wait till we have enough space while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { @@ -1148,7 +1171,7 @@ class DFSOutputStream extends FSOutputSu } } isClosed(); - queuePacket(packet); + queueCurrentPacket(); } } @@ -1202,8 +1225,7 @@ class DFSOutputStream extends FSOutputSu ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } - waitAndQueuePacket(currentPacket); - currentPacket = null; + waitAndQueueCurrentPacket(); // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full @@ -1225,10 +1247,9 @@ class DFSOutputStream extends FSOutputSu currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; - waitAndQueuePacket(currentPacket); - currentPacket = null; + waitAndQueueCurrentPacket(); bytesCurBlock = 0; - lastFlushOffset = -1; + lastFlushOffset = 0; } } } @@ -1245,60 +1266,88 @@ class DFSOutputStream extends FSOutputSu * but not neccessary on the DN's OS buffers. * * It is a synchronous operation. When it returns, - * it gurantees that flushed data become visible to new readers. + * it guarantees that flushed data become visible to new readers. * It is not guaranteed that data has been flushed to * persistent store on the datanode. * Block allocations are persisted on namenode. */ @Override - public synchronized void hflush() throws IOException { + public void hflush() throws IOException { dfsClient.checkOpen(); isClosed(); try { - /* Record current blockOffset. This might be changed inside - * flushBuffer() where a partial checksum chunk might be flushed. - * After the flush, reset the bytesCurBlock back to its previous value, - * any partial checksum chunk will be sent now and in next packet. - */ - long saveOffset = bytesCurBlock; - - // flush checksum buffer, but keep checksum buffer intact - flushBuffer(true); - - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient flush() : saveOffset " + saveOffset + + long toWaitFor; + synchronized (this) { + /* Record current blockOffset. This might be changed inside + * flushBuffer() where a partial checksum chunk might be flushed. + * After the flush, reset the bytesCurBlock back to its previous value, + * any partial checksum chunk will be sent now and in next packet. + */ + long saveOffset = bytesCurBlock; + Packet oldCurrentPacket = currentPacket; + // flush checksum buffer, but keep checksum buffer intact + flushBuffer(true); + // bytesCurBlock potentially incremented if there was buffered data + + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug( + "DFSClient flush() : saveOffset " + saveOffset + " bytesCurBlock " + bytesCurBlock + " lastFlushOffset " + lastFlushOffset); - } - - // Flush only if we haven't already flushed till this offset. - if (lastFlushOffset != bytesCurBlock) { + } + // Flush only if we haven't already flushed till this offset. + if (lastFlushOffset != bytesCurBlock) { + assert bytesCurBlock > lastFlushOffset; + // record the valid offset of this flush + lastFlushOffset = bytesCurBlock; + waitAndQueueCurrentPacket(); + } else { + // We already flushed up to this offset. + // This means that we haven't written anything since the last flush + // (or the beginning of the file). Hence, we should not have any + // packet queued prior to this call, since the last flush set + // currentPacket = null. + assert oldCurrentPacket == null : + "Empty flush should not occur with a currentPacket"; - // record the valid offset of this flush - lastFlushOffset = bytesCurBlock; + // just discard the current packet since it is already been sent. + currentPacket = null; + } + // Restore state of stream. Record the last flush offset + // of the last full chunk that was flushed. + // + bytesCurBlock = saveOffset; + toWaitFor = lastQueuedSeqno; + } // end synchronized - // wait for all packets to be sent and acknowledged - flushInternal(); - } else { - // just discard the current packet since it is already been sent. - currentPacket = null; - } - - // Restore state of stream. Record the last flush offset - // of the last full chunk that was flushed. - // - bytesCurBlock = saveOffset; + waitForAckedSeqno(toWaitFor); // If any new blocks were allocated since the last flush, // then persist block locations on namenode. // if (persistBlocks.getAndSet(false)) { - dfsClient.namenode.fsync(src, dfsClient.clientName); + try { + dfsClient.namenode.fsync(src, dfsClient.clientName); + } catch (IOException ioe) { + DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); + // If we got an error here, it might be because some other thread called + // close before our hflush completed. In that case, we should throw an + // exception that the stream is closed. + isClosed(); + // If we aren't closed but failed to sync, we should expose that to the + // caller. + throw ioe; + } } } catch (IOException e) { - lastException = new IOException("IOException flush:" + e); - closeThreads(true); - throw e; + DFSClient.LOG.warn("Error while syncing", e); + synchronized (this) { + if (!closed) { + lastException = new IOException("IOException flush:" + e); + closeThreads(true); + } + } + throw e; } } @@ -1339,26 +1388,39 @@ class DFSOutputStream extends FSOutputSu * Waits till all existing data is flushed and confirmations * received from datanodes. */ - private synchronized void flushInternal() throws IOException { - dfsClient.checkOpen(); - isClosed(); - // - // If there is data in the current buffer, send it across - // - if (currentPacket != null) { - queuePacket(currentPacket); - currentPacket = null; + private void flushInternal() throws IOException { + long toWaitFor; + synchronized (this) { + dfsClient.checkOpen(); + isClosed(); + // + // If there is data in the current buffer, send it across + // + queueCurrentPacket(); + toWaitFor = lastQueuedSeqno; } + waitForAckedSeqno(toWaitFor); + } + + private void waitForAckedSeqno(long seqno) throws IOException { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waiting for ack for: " + seqno); + } synchronized (dataQueue) { - while (!closed && dataQueue.size() + ackQueue.size() > 0) { + while (!closed) { + isClosed(); + if (lastAckedSeqno >= seqno) { + break; + } try { - dataQueue.wait(); - } catch (InterruptedException e) { + dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } } - isClosed(); } + isClosed(); } /** @@ -1410,7 +1472,7 @@ class DFSOutputStream extends FSOutputSu flushBuffer(); // flush from all upper layers if (currentPacket != null) { - waitAndQueuePacket(currentPacket); + waitAndQueueCurrentPacket(); } if (bytesCurBlock != 0) { Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1035386&r1=1035385&r2=1035386&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Nov 15 18:23:42 2010 @@ -1377,8 +1377,10 @@ public class FSDataset implements FSCons FileChannel channel = file.getChannel(); long oldPos = channel.position(); long newPos = oldPos - checksumSize; - DataNode.LOG.info("Changing meta file offset of block " + b + " from " + - oldPos + " to " + newPos); + if (DataNode.LOG.isDebugEnabled()) { + DataNode.LOG.debug("Changing meta file offset of block " + b + " from " + + oldPos + " to " + newPos); + } channel.position(newPos); } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1035386&r1=1035385&r2=1035386&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Mon Nov 15 18:23:42 2010 @@ -335,6 +335,7 @@ public class SecondaryNameNode implement public Void run() throws Exception { checkpointImage.cTime = sig.cTime; checkpointImage.checkpointTime = sig.checkpointTime; + checkpointImage.imageDigest = sig.imageDigest; // get fsimage String fileid = "getimage=1"; Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=1035386&r1=1035385&r2=1035386&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Mon Nov 15 18:23:42 2010 @@ -86,13 +86,13 @@ privileged public aspect DFSClientAspect LOG.info("FI: before pipelineClose:"); } - pointcut checkAckQueue(DFSOutputStream.Packet cp): - call (void DFSOutputStream.waitAndQueuePacket( - DFSOutputStream.Packet)) + pointcut checkAckQueue(DFSOutputStream stream): + call (void DFSOutputStream.waitAndQueueCurrentPacket()) && withincode (void DFSOutputStream.writeChunk(..)) - && args(cp); + && this(stream); - after(DFSOutputStream.Packet cp) : checkAckQueue (cp) { + after(DFSOutputStream stream) : checkAckQueue (stream) { + DFSOutputStream.Packet cp = stream.currentPacket; PipelineTest pTest = DataTransferTestUtil.getDataTransferTest(); if (pTest != null && pTest instanceof PipelinesTest) { LOG.debug("FI: Recording packet # " + cp.seqno Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1035386&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Mon Nov 15 18:23:42 2010 @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.Test; + +import java.io.*; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.log4j.Level; + +/** + * This class tests hflushing concurrently from many threads. + */ +public class TestMultiThreadedHflush { + static final int blockSize = 1024*1024; + static final int numBlocks = 10; + static final int fileSize = numBlocks * blockSize + 1; + + private static final int NUM_THREADS = 10; + private static final int WRITE_SIZE = 517; + private static final int NUM_WRITES_PER_THREAD = 1000; + + private byte[] toWrite = null; + + { + ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL); + } + + /* + * creates a file but does not close it + */ + private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) + throws IOException { + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short)repl, (long)blockSize); + return stm; + } + + private void initBuffer(int size) { + long seed = AppendTestUtil.nextLong(); + toWrite = AppendTestUtil.randomBytes(seed, size); + } + + private class WriterThread extends Thread { + private final FSDataOutputStream stm; + private final AtomicReference thrown; + private final int numWrites; + private final CountDownLatch countdown; + + public WriterThread(FSDataOutputStream stm, + AtomicReference thrown, + CountDownLatch countdown, int numWrites) { + this.stm = stm; + this.thrown = thrown; + this.numWrites = numWrites; + this.countdown = countdown; + } + + public void run() { + try { + countdown.await(); + for (int i = 0; i < numWrites && thrown.get() == null; i++) { + doAWrite(); + } + } catch (Throwable t) { + thrown.compareAndSet(null, t); + } + } + + private void doAWrite() throws IOException { + stm.write(toWrite); + stm.hflush(); + } + } + + + /** + * Test case where a bunch of threads are both appending and flushing. + * They all finish before the file is closed. + */ + @Test + public void testMultipleHflushers() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/multiple-hflushers.dat"); + try { + doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD); + } finally { + fs.close(); + cluster.shutdown(); + } + } + + /** + * Test case where a bunch of threads are continuously calling hflush() while another + * thread appends some data and then closes the file. + * + * The hflushing threads should eventually catch an IOException stating that the stream + * was closed -- and not an NPE or anything like that. + */ + @Test + public void testHflushWhileClosing() throws Throwable { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/hflush-and-close.dat"); + + final FSDataOutputStream stm = createFile(fs, p, 1); + + + ArrayList flushers = new ArrayList(); + final AtomicReference thrown = new AtomicReference(); + try { + for (int i = 0; i < 10; i++) { + Thread flusher = new Thread() { + public void run() { + try { + while (true) { + try { + stm.hflush(); + } catch (IOException ioe) { + if (!ioe.toString().contains("DFSOutputStream is closed")) { + throw ioe; + } else { + return; + } + } + } + } catch (Throwable t) { + thrown.set(t); + } + } + }; + flusher.start(); + flushers.add(flusher); + } + + // Write some data + for (int i = 0; i < 10000; i++) { + stm.write(1); + } + + // Close it while the flushing threads are still flushing + stm.close(); + + // Wait for the flushers to all die. + for (Thread t : flushers) { + t.join(); + } + + // They should have all gotten the expected exception, not anything + // else. + if (thrown.get() != null) { + throw thrown.get(); + } + + } finally { + fs.close(); + cluster.shutdown(); + } + } + + public void doMultithreadedWrites( + Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) + throws Exception { + initBuffer(bufferSize); + + // create a new file. + FileSystem fs = p.getFileSystem(conf); + FSDataOutputStream stm = createFile(fs, p, 1); + System.out.println("Created file simpleFlush.dat"); + + // There have been a couple issues with flushing empty buffers, so do + // some empty flushes first. + stm.hflush(); + stm.hflush(); + stm.write(1); + stm.hflush(); + stm.hflush(); + + CountDownLatch countdown = new CountDownLatch(1); + ArrayList threads = new ArrayList(); + AtomicReference thrown = new AtomicReference(); + for (int i = 0; i < numThreads; i++) { + Thread t = new WriterThread(stm, thrown, countdown, numWrites); + threads.add(t); + t.start(); + } + + // Start all the threads at the same time for maximum raciness! + countdown.countDown(); + + for (Thread t : threads) { + t.join(); + } + if (thrown.get() != null) { + throw new RuntimeException("Deferred", thrown.get()); + } + stm.close(); + System.out.println("Closed file."); + } + + public static void main(String args[]) throws Exception { + if (args.length != 1) { + System.err.println( + "usage: " + TestMultiThreadedHflush.class.getSimpleName() + + " "); + System.exit(1); + } + TestMultiThreadedHflush test = new TestMultiThreadedHflush(); + Configuration conf = new Configuration(); + Path p = new Path(args[0]); + long st = System.nanoTime(); + test.doMultithreadedWrites(conf, p, 10, 511, 50000); + long et = System.nanoTime(); + + System.out.println("Finished in " + ((et - st) / 1000000) + "ms"); + } + +}