Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA98D10392 for ; Mon, 7 Oct 2013 03:49:01 +0000 (UTC) Received: (qmail 27333 invoked by uid 500); 7 Oct 2013 03:48:10 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 27254 invoked by uid 500); 7 Oct 2013 03:48:01 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 27226 invoked by uid 99); 7 Oct 2013 03:47:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Oct 2013 03:47:57 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,FRT_OFFER2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Oct 2013 03:47:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8BAE72388A67; Mon, 7 Oct 2013 03:47:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1529738 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ Date: Mon, 07 Oct 2013 03:47:31 -0000 To: hdfs-commits@hadoop.apache.org From: brandonli@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131007034731.8BAE72388A67@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: brandonli Date: Mon Oct 7 03:47:30 2013 New Revision: 1529738 URL: http://svn.apache.org/r1529738 Log: HDFS-5259. Merging change r1529735 from branch-2 Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java - copied unchanged from r1529735, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1529738&r1=1529737&r2=1529738&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Mon Oct 7 03:47:30 2013 @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.security.InvalidParameterException; import java.util.EnumSet; @@ -55,6 +56,7 @@ import org.apache.hadoop.oncrpc.security import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -362,6 +364,30 @@ class OpenFileCtx { } } + @VisibleForTesting + public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { + long offset = request.getOffset(); + int count = request.getCount(); + long smallerCount = offset + count - cachedOffset; + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + } + + ByteBuffer data = request.getData(); + Preconditions.checkState(data.position() == 0, + "The write request data has non-zero position"); + data.position((int) (cachedOffset - offset)); + Preconditions.checkState(data.limit() - data.position() == smallerCount, + "The write request buffer has wrong limit/position regarding count"); + + request.setOffset(cachedOffset); + request.setCount((int) smallerCount); + } + /** * Creates and adds a WriteCtx into the pendingWrites map. This is a * synchronized method to handle concurrent writes. @@ -374,12 +400,40 @@ class OpenFileCtx { long offset = request.getOffset(); int count = request.getCount(); long cachedOffset = nextOffset.get(); - + int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; + if (LOG.isDebugEnabled()) { LOG.debug("requesed offset=" + offset + " and current offset=" + cachedOffset); } + // Handle a special case first + if ((offset < cachedOffset) && (offset + count > cachedOffset)) { + // One Linux client behavior: after a file is closed and reopened to + // write, the client sometimes combines previous written data(could still + // be in kernel buffer) with newly appended data in one write. This is + // usually the first write after file reopened. In this + // case, we log the event and drop the overlapped section. + LOG.warn(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + + if (!pendingWrites.isEmpty()) { + LOG.warn("There are other pending writes, fail this jumbo write"); + return null; + } + + LOG.warn("Modify this write to write only the appended data"); + alterWriteRequest(request, cachedOffset); + + // Update local variable + originalCount = count; + offset = request.getOffset(); + count = request.getCount(); + } + // Fail non-append call if (offset < cachedOffset) { LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," @@ -389,8 +443,9 @@ class OpenFileCtx { DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP; WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, dataState); + request.getOffset(), request.getCount(), originalCount, + request.getStableHow(), request.getData(), channel, xid, false, + dataState); if (LOG.isDebugEnabled()) { LOG.debug("Add new write to the list with nextOffset " + cachedOffset + " and requesed offset=" + offset); @@ -421,8 +476,7 @@ class OpenFileCtx { WRITE3Response response; long cachedOffset = nextOffset.get(); if (offset + count > cachedOffset) { - LOG.warn("Haven't noticed any partial overwrite for a sequential file" - + " write requests. Treat it as a real random write, no support."); + LOG.warn("Treat this jumbo write as a real random write, no support."); response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF); } else { @@ -641,6 +695,7 @@ class OpenFileCtx { private void addWrite(WriteCtx writeCtx) { long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); + // For the offset range (min, max), min is inclusive, and max is exclusive pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); } @@ -753,19 +808,7 @@ class OpenFileCtx { long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); WriteStableHow stableHow = writeCtx.getStableHow(); - byte[] data = null; - try { - data = writeCtx.getData(); - } catch (Exception e1) { - LOG.error("Failed to get request data offset:" + offset + " count:" - + count + " error:" + e1); - // Cleanup everything - cleanup(); - return; - } - Preconditions.checkState(data.length == count); - FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " @@ -774,8 +817,8 @@ class OpenFileCtx { try { // The write is not protected by lock. asyncState is used to make sure - // there is one thread doing write back at any time - fos.write(data, 0, count); + // there is one thread doing write back at any time + writeCtx.writeData(fos); long flushedOffset = getFlushedOffset(); if (flushedOffset != (offset + count)) { @@ -784,10 +827,6 @@ class OpenFileCtx { + (offset + count)); } - if (LOG.isDebugEnabled()) { - LOG.debug("After writing " + handle.getFileId() + " at offset " - + offset + ", update the memory count."); - } // Reduce memory occupation size if request was allowed dumped if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { @@ -795,6 +834,11 @@ class OpenFileCtx { if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); updateNonSequentialWriteInMemory(-count); + if (LOG.isDebugEnabled()) { + LOG.debug("After writing " + handle.getFileId() + " at offset " + + offset + ", updated the memory count, new value:" + + nonSequentialWriteInMemory.get()); + } } } } @@ -802,6 +846,11 @@ class OpenFileCtx { if (!writeCtx.getReplied()) { WccAttr preOpAttr = latestAttr.getWccAttr(); WccData fileWcc = new WccData(preOpAttr, latestAttr); + if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) { + LOG.warn("Return original count:" + writeCtx.getOriginalCount() + + " instead of real data count:" + count); + count = writeCtx.getOriginalCount(); + } WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( @@ -809,7 +858,7 @@ class OpenFileCtx { } } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " - + offset + " and length " + data.length, e); + + offset + " and length " + count, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1529738&r1=1529737&r2=1529738&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Mon Oct 7 03:47:30 2013 @@ -20,13 +20,16 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -50,8 +53,17 @@ class WriteCtx { private final FileHandle handle; private final long offset; private final int count; + + //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache() + private final int originalCount; + public static final int INVALID_ORIGINAL_COUNT = -1; + + public int getOriginalCount() { + return originalCount; + } + private final WriteStableHow stableHow; - private volatile byte[] data; + private volatile ByteBuffer data; private final Channel channel; private final int xid; @@ -89,9 +101,13 @@ class WriteCtx { } return 0; } + + // Resized write should not allow dump + Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT); + this.raf = raf; dumpFileOffset = dumpOut.getChannel().position(); - dumpOut.write(data, 0, count); + dumpOut.write(data.array(), 0, count); if (LOG.isDebugEnabled()) { LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); } @@ -127,7 +143,8 @@ class WriteCtx { return stableHow; } - byte[] getData() throws IOException { + @VisibleForTesting + ByteBuffer getData() throws IOException { if (dataState != DataState.DUMPED) { synchronized (this) { if (dataState != DataState.DUMPED) { @@ -143,15 +160,45 @@ class WriteCtx { private void loadData() throws IOException { Preconditions.checkState(data == null); - data = new byte[count]; + byte[] rawData = new byte[count]; raf.seek(dumpFileOffset); - int size = raf.read(data, 0, count); + int size = raf.read(rawData, 0, count); if (size != count) { throw new IOException("Data count is " + count + ", but read back " + size + "bytes"); } + data = ByteBuffer.wrap(rawData); } + public void writeData(HdfsDataOutputStream fos) throws IOException { + Preconditions.checkState(fos != null); + + ByteBuffer dataBuffer = null; + try { + dataBuffer = getData(); + } catch (Exception e1) { + LOG.error("Failed to get request data offset:" + offset + " count:" + + count + " error:" + e1); + throw new IOException("Can't get WriteCtx.data"); + } + + byte[] data = dataBuffer.array(); + int position = dataBuffer.position(); + int limit = dataBuffer.limit(); + Preconditions.checkState(limit - position == count); + // Modified write has a valid original count + if (position != 0) { + if (limit != getOriginalCount()) { + throw new IOException("Modified write has differnt original size." + + "buff position:" + position + " buff limit:" + limit + ". " + + toString()); + } + } + + // Now write data + fos.write(data, position, count); + } + Channel getChannel() { return channel; } @@ -168,11 +215,13 @@ class WriteCtx { this.replied = replied; } - WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow, - byte[] data, Channel channel, int xid, boolean replied, DataState dataState) { + WriteCtx(FileHandle handle, long offset, int count, int originalCount, + WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid, + boolean replied, DataState dataState) { this.handle = handle; this.offset = offset; this.count = count; + this.originalCount = originalCount; this.stableHow = stableHow; this.data = data; this.channel = channel; @@ -185,7 +234,7 @@ class WriteCtx { @Override public String toString() { return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count - + " stableHow:" + stableHow + " replied:" + replied + " dataState:" - + dataState + " xid:" + xid; + + " originalCount:" + originalCount + " stableHow:" + stableHow + + " replied:" + replied + " dataState:" + dataState + " xid:" + xid; } } \ No newline at end of file Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1529738&r1=1529737&r2=1529738&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Oct 7 03:47:30 2013 @@ -73,6 +73,9 @@ Release 2.1.2 - UNRELEASED HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened. (Vinay via jing9) + HDFS-5259. Support client which combines appended data with old data + before sends it to NFS server. (brandonli) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES