Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC09517685 for ; Fri, 10 Apr 2015 09:07:43 +0000 (UTC) Received: (qmail 3141 invoked by uid 500); 10 Apr 2015 09:07:30 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 3076 invoked by uid 500); 10 Apr 2015 09:07:30 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 3067 invoked by uid 99); 10 Apr 2015 09:07:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 09:07:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DF3BDFF0B; Fri, 10 Apr 2015 09:07:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: drankye@apache.org To: common-commits@hadoop.apache.org Message-Id: <8add36d9fc0c451a9450643658860f46@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo Date: Fri, 10 Apr 2015 09:07:30 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/HDFS-7285 0c0712ed5 -> edf44cf43 HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/edf44cf4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/edf44cf4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/edf44cf4 Branch: refs/heads/HDFS-7285 Commit: edf44cf437a1c32d2eaeb65b5381ecdae0663173 Parents: 0c0712e Author: Kai Zheng Authored: Sat Apr 11 01:03:37 2015 +0800 Committer: Kai Zheng Committed: Sat Apr 11 01:03:37 2015 +0800 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 4 +- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 13 +- .../java/org/apache/hadoop/hdfs/DFSPacket.java | 26 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 439 +++++++++++++++++++ .../org/apache/hadoop/hdfs/DataStreamer.java | 12 +- .../apache/hadoop/hdfs/StripedDataStreamer.java | 241 ++++++++++ .../hadoop/hdfs/TestDFSStripedOutputStream.java | 311 +++++++++++++ 7 files changed, 1031 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 1e695c4..753795a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -56,4 +56,6 @@ HDFS-8074. Define a system-wide default EC schema. (Kai Zheng) - HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng) \ No newline at end of file + HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng) + + HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f6733e3..dacc722 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -267,8 +267,14 @@ public class DFSOutputStream extends FSOutputSummer } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); + final DFSOutputStream out; + if(stat.getReplication() == 0) { + out = new DFSStripedOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + } else { + out = new DFSOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + } out.start(); return out; } finally { @@ -345,6 +351,9 @@ public class DFSOutputStream extends FSOutputSummer String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForAppend", src); + if(stat.getReplication() == 0) { + throw new IOException("Not support appending to a striping layout file yet."); + } try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum); http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 7e7f780..5add8e4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.DataOutputStream; import java.io.IOException; import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; @@ -111,6 +112,19 @@ class DFSPacket { dataPos += len; } + synchronized void writeData(ByteBuffer inBuffer, int len) + throws ClosedChannelException { + checkBuffer(); + len = len > inBuffer.remaining() ? inBuffer.remaining() : len; + if (dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + for (int i = 0; i < len; i++) { + buf[dataPos + i] = inBuffer.get(); + } + dataPos += len; + } + /** * Write checksums to this packet * @@ -220,7 +234,7 @@ class DFSPacket { * * @return true if the packet is the last packet */ - boolean isLastPacketInBlock(){ + boolean isLastPacketInBlock() { return lastPacketInBlock; } @@ -229,7 +243,7 @@ class DFSPacket { * * @return the sequence number of this packet */ - long getSeqno(){ + long getSeqno() { return seqno; } @@ -238,14 +252,14 @@ class DFSPacket { * * @return the number of chunks in this packet */ - synchronized int getNumChunks(){ + synchronized int getNumChunks() { return numChunks; } /** * increase the number of chunks by one */ - synchronized void incNumChunks(){ + synchronized void incNumChunks() { numChunks++; } @@ -254,7 +268,7 @@ class DFSPacket { * * @return the maximum number of packets */ - int getMaxChunks(){ + int getMaxChunks() { return maxChunks; } @@ -263,7 +277,7 @@ class DFSPacket { * * @param syncBlock if to sync block */ - synchronized void setSyncBlock(boolean syncBlock){ + synchronized void setSyncBlock(boolean syncBlock) { this.syncBlock = syncBlock; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java new file mode 100644 index 0000000..aded4fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + + +/**************************************************************** + * The DFSStripedOutputStream class supports writing files in striped + * layout. Each stripe contains a sequence of cells and multiple + * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible + * for writing the cells to different datanodes. + * + ****************************************************************/ + +@InterfaceAudience.Private +public class DFSStripedOutputStream extends DFSOutputStream { + + private final List streamers; + /** + * Size of each striping cell, must be a multiple of bytesPerChecksum + */ + private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private ByteBuffer[] cellBuffers; + private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private int curIdx = 0; + /* bytes written in current block group */ + private long currentBlockGroupBytes = 0; + + //TODO: Use ErasureCoder interface (HDFS-7781) + private RawErasureEncoder encoder; + + private StripedDataStreamer getLeadingStreamer() { + return streamers.get(0); + } + + private long getBlockGroupSize() { + return blockSize * HdfsConstants.NUM_DATA_BLOCKS; + } + + /** Construct a new output stream for creating a file. */ + DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, + EnumSet flag, Progressable progress, + DataChecksum checksum, String[] favoredNodes) + throws IOException { + super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); + DFSClient.LOG.info("Creating striped output stream"); + if (blockGroupBlocks <= 1) { + throw new IOException("The block group must contain more than one block."); + } + + cellBuffers = new ByteBuffer[blockGroupBlocks]; + List> stripeBlocks = new ArrayList<>(); + + for (int i = 0; i < blockGroupBlocks; i++) { + stripeBlocks.add(new LinkedBlockingQueue(blockGroupBlocks)); + try { + cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); + } catch (InterruptedException ie) { + final InterruptedIOException iioe = new InterruptedIOException( + "create cell buffers"); + iioe.initCause(ie); + throw iioe; + } + } + encoder = new RSRawEncoder(); + encoder.initialize(blockGroupDataBlocks, + blockGroupBlocks - blockGroupDataBlocks, cellSize); + + streamers = new ArrayList<>(blockGroupBlocks); + for (short i = 0; i < blockGroupBlocks; i++) { + StripedDataStreamer streamer = new StripedDataStreamer(stat, null, + dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, + i, stripeBlocks); + if (favoredNodes != null && favoredNodes.length != 0) { + streamer.setFavoredNodes(favoredNodes); + } + streamers.add(streamer); + } + + refreshStreamer(); + } + + private void refreshStreamer() { + streamer = streamers.get(curIdx); + } + + private void moveToNextStreamer() { + curIdx = (curIdx + 1) % blockGroupBlocks; + refreshStreamer(); + } + + /** + * encode the buffers. + * After encoding, flip each buffer. + * + * @param buffers data buffers + parity buffers + */ + private void encode(ByteBuffer[] buffers) { + ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks]; + ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks]; + for (int i = 0; i < blockGroupBlocks; i++) { + if (i < blockGroupDataBlocks) { + dataBuffers[i] = buffers[i]; + } else { + parityBuffers[i - blockGroupDataBlocks] = buffers[i]; + } + } + encoder.encode(dataBuffers, parityBuffers); + } + + /** + * Generate packets from a given buffer + * + * @param byteBuffer the given buffer to generate packets + * @return packets generated + * @throws IOException + */ + private List generatePackets(ByteBuffer byteBuffer) + throws IOException{ + List packets = new ArrayList<>(); + while (byteBuffer.remaining() > 0) { + DFSPacket p = createPacket(packetSize, chunksPerPacket, + streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), false); + int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum; + int toWrite = byteBuffer.remaining() > maxBytesToPacket ? + maxBytesToPacket: byteBuffer.remaining(); + p.writeData(byteBuffer, toWrite); + streamer.incBytesCurBlock(toWrite); + packets.add(p); + } + return packets; + } + + @Override + protected synchronized void writeChunk(byte[] b, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { + super.writeChunk(b, offset, len, checksum, ckoff, cklen); + + if (getSizeOfCellnBuffer(curIdx) <= cellSize) { + addToCellBuffer(b, offset, len); + } else { + String msg = "Writing a chunk should not overflow the cell buffer."; + DFSClient.LOG.info(msg); + throw new IOException(msg); + } + + + // If current packet has not been enqueued for transmission, + // but the cell buffer is full, we need to enqueue the packet + if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" + + currentPacket.getSeqno() + + ", curIdx=" + curIdx + + ", src=" + src + + ", bytesCurBlock=" + streamer.getBytesCurBlock() + + ", blockSize=" + blockSize + + ", appendChunk=" + streamer.getAppendChunk()); + } + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + adjustChunkBoundary(); + endBlock(); + } + + // Two extra steps are needed when a striping cell is full: + // 1. Forward the current index pointer + // 2. Generate parity packets if a full stripe of data cells are present + if (getSizeOfCellnBuffer(curIdx) == cellSize) { + //move curIdx to next cell + moveToNextStreamer(); + //When all data cells in a stripe are ready, we need to encode + //them and generate some parity cells. These cells will be + //converted to packets and put to their DataStreamer's queue. + if (curIdx == blockGroupDataBlocks) { + //encode the data cells + for (int k = 0; k < blockGroupDataBlocks; k++) { + cellBuffers[k].flip(); + } + encode(cellBuffers); + for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { + ByteBuffer parityBuffer = cellBuffers[i]; + List packets = generatePackets(parityBuffer); + for (DFSPacket p : packets) { + currentPacket = p; + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + } + endBlock(); + moveToNextStreamer(); + } + //read next stripe to cellBuffers + clearCellBuffers(); + } + } + } + + private void addToCellBuffer(byte[] b, int off, int len) { + cellBuffers[curIdx].put(b, off, len); + } + + private int getSizeOfCellnBuffer(int cellIndex) { + return cellBuffers[cellIndex].position(); + } + + private void clearCellBuffers() { + for (int i = 0; i< blockGroupBlocks; i++) { + cellBuffers[i].clear(); + } + } + + private int stripeDataSize() { + return blockGroupDataBlocks * cellSize; + } + + private void notSupported(String headMsg) + throws IOException{ + throw new IOException( + headMsg + " is now not supported for striping layout."); + } + + @Override + public void hflush() throws IOException { + notSupported("hflush"); + } + + @Override + public void hsync() throws IOException { + notSupported("hsync"); + } + + + @Override + protected synchronized void start() { + for (StripedDataStreamer streamer : streamers) { + streamer.start(); + } + } + + @Override + synchronized void abort() throws IOException { + if (isClosed()) { + return; + } + for (StripedDataStreamer streamer : streamers) { + streamer.setLastException(new IOException("Lease timeout of " + + (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); + } + closeThreads(true); + dfsClient.endFileLease(fileId); + } + + //TODO: Handle slow writers (HDFS-7786) + //Cuurently only check if the leading streamer is terminated + boolean isClosed() { + return closed || getLeadingStreamer().streamerClosed(); + } + + // shutdown datastreamer and responseprocessor threads. + // interrupt datastreamer if force is true + @Override + protected void closeThreads(boolean force) throws IOException { + StripedDataStreamer leadingStreamer = null; + for (StripedDataStreamer streamer : streamers) { + try { + streamer.close(force); + streamer.join(); + streamer.closeSocket(); + if (streamer.isLeadingStreamer()) { + leadingStreamer = streamer; + } else { + streamer.countTailingBlockGroupBytes(); + } + + } catch (InterruptedException e) { + throw new IOException("Failed to shutdown streamer"); + } finally { + streamer.setSocketToNull(); + setClosed(); + } + } + leadingStreamer.countTailingBlockGroupBytes(); + } + + @Override + public synchronized void write(int b) throws IOException { + super.write(b); + currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize(); + } + + @Override + public synchronized void write(byte b[], int off, int len) + throws IOException { + super.write(b, off, len); + currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize(); + } + + private void writeParityCellsForLastStripe() throws IOException{ + if(currentBlockGroupBytes == 0 || + currentBlockGroupBytes % stripeDataSize() == 0) + return; + int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize()); + // Size of parity cells should equal the size of the first cell, if it + // is not full. + int parityCellSize = cellSize; + int index = lastStripeLen / cellSize; + if (lastStripeLen < cellSize) { + parityCellSize = lastStripeLen; + index++; + } + for (int i = 0; i < blockGroupBlocks; i++) { + if (i >= index) { + int position = cellBuffers[i].position(); + for (int j = 0; j < parityCellSize - position; j++) { + cellBuffers[i].put((byte)0); + } + } + cellBuffers[i].flip(); + } + encode(cellBuffers); + + //write parity cells + curIdx = blockGroupDataBlocks; + refreshStreamer(); + for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { + ByteBuffer parityBuffer = cellBuffers[i]; + List packets = generatePackets(parityBuffer); + for (DFSPacket p : packets) { + currentPacket = p; + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + } + endBlock(); + moveToNextStreamer(); + } + + clearCellBuffers(); + } + + @Override + void setClosed() { + super.setClosed(); + for (int i = 0; i < blockGroupBlocks; i++) { + byteArrayManager.release(cellBuffers[i].array()); + streamers.get(i).release(); + } + } + + @Override + protected synchronized void closeImpl() throws IOException { + if (isClosed()) { + IOException e = getLeadingStreamer().getLastException().getAndSet(null); + if (e == null) + return; + else + throw e; + } + + try { + // flush from all upper layers + flushBuffer(); + if (currentPacket != null) { + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + } + //if the last stripe is incomplete, generate and write parity cells + writeParityCellsForLastStripe(); + + for (int i = 0; i < blockGroupBlocks; i++) { + curIdx = i; + refreshStreamer(); + if (streamer.getBytesCurBlock()!= 0 || + currentBlockGroupBytes < getBlockGroupSize()) { + // send an empty packet to mark the end of the block + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + } + // flush all data to Datanode + flushInternal(); + } + + // get last block before destroying the streamer + ExtendedBlock lastBlock = streamers.get(0).getBlock(); + closeThreads(false); + TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } + dfsClient.endFileLease(fileId); + } catch (ClosedChannelException e) { + } finally { + setClosed(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 6bcbfde..51ae9f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -163,7 +163,7 @@ class DataStreamer extends Daemon { } private volatile boolean streamerClosed = false; - private ExtendedBlock block; // its length is number of bytes acked + protected ExtendedBlock block; // its length is number of bytes acked private Token accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; @@ -171,7 +171,7 @@ class DataStreamer extends Daemon { private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; - private String[] favoredNodes; + protected String[] favoredNodes; volatile boolean hasError = false; volatile int errorIndex = -1; // Restarting node index @@ -198,12 +198,12 @@ class DataStreamer extends Daemon { private final AtomicReference lastException = new AtomicReference<>(); private Socket s; - private final DFSClient dfsClient; - private final String src; + protected final DFSClient dfsClient; + protected final String src; /** Only for DataTransferProtocol.writeBlock(..) */ private final DataChecksum checksum4WriteBlock; private final Progressable progress; - private final HdfsFileStatus stat; + protected final HdfsFileStatus stat; // appending to existing partial block private volatile boolean appendChunk = false; // both dataQueue and ackQueue are protected by dataQueue lock @@ -326,7 +326,7 @@ class DataStreamer extends Daemon { stage = BlockConstructionStage.DATA_STREAMING; } - private void endBlock() { + protected void endBlock() { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Closing old block " + block); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java new file mode 100644 index 0000000..710d92d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.util.List; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/**************************************************************************** + * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. + * There are two kinds of StripedDataStreamer, leading streamer and ordinary + * stream. Leading streamer requests a block group from NameNode, unwraps + * it to located blocks and transfers each located block to its corresponding + * ordinary streamer via a blocking queue. + * + ****************************************************************************/ +public class StripedDataStreamer extends DataStreamer { + private final short index; + private final List> stripedBlocks; + private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + private boolean hasCommittedBlock = false; + + StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block, + DFSClient dfsClient, String src, + Progressable progress, DataChecksum checksum, + AtomicReference cachingStrategy, + ByteArrayManager byteArrayManage, short index, + List> stripedBlocks) { + super(stat,block, dfsClient, src, progress, checksum, cachingStrategy, + byteArrayManage); + this.index = index; + this.stripedBlocks = stripedBlocks; + } + + /** + * Construct a data streamer for appending to the last partial block + * @param lastBlock last block of the file to be appended + * @param stat status of the file to be appended + * @throws IOException if error occurs + */ + StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, + DFSClient dfsClient, String src, + Progressable progress, DataChecksum checksum, + AtomicReference cachingStrategy, + ByteArrayManager byteArrayManage, short index, + List> stripedBlocks) + throws IOException { + super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy, + byteArrayManage); + this.index = index; + this.stripedBlocks = stripedBlocks; + } + + public boolean isLeadingStreamer () { + return index == 0; + } + + private boolean isParityStreamer() { + return index >= HdfsConstants.NUM_DATA_BLOCKS; + } + + @Override + protected void endBlock() { + if (!isLeadingStreamer() && !isParityStreamer()) { + //before retrieving a new block, transfer the finished block to + //leading streamer + LocatedBlock finishedBlock = new LocatedBlock( + new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(), + block.getNumBytes(),block.getGenerationStamp()), null); + try{ + boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, + TimeUnit.SECONDS); + }catch (InterruptedException ie) { + //TODO: Handle InterruptedException (HDFS-7786) + } + } + super.endBlock(); + } + + /** + * This function is called after the streamer is closed. + */ + void countTailingBlockGroupBytes () throws IOException { + if (isLeadingStreamer()) { + //when committing a block group, leading streamer has to adjust + // {@link block} including the size of block group + for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { + try { + LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, + TimeUnit.SECONDS); + if (finishedLocatedBlock == null) { + throw new IOException("Fail to get finished LocatedBlock " + + "from streamer, i=" + i); + } + ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock(); + long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes(); + if (block != null) { + block.setNumBytes(block.getNumBytes() + bytes); + } + } catch (InterruptedException ie) { + DFSClient.LOG.info("InterruptedException received when " + + "putting a block to stripeBlocks, ie = " + ie); + } + } + } else if (!isParityStreamer()) { + if (block == null || block.getNumBytes() == 0) { + LocatedBlock finishedBlock = new LocatedBlock(null, null); + try { + boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, + TimeUnit.SECONDS); + } catch (InterruptedException ie) { + //TODO: Handle InterruptedException (HDFS-7786) + ie.printStackTrace(); + } + } + } + + } + + @Override + protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + throws IOException { + LocatedBlock lb = null; + if (isLeadingStreamer()) { + if(hasCommittedBlock) { + //when committing a block group, leading streamer has to adjust + // {@link block} including the size of block group + for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { + try { + LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, + TimeUnit.SECONDS); + if (finishedLocatedBlock == null) { + throw new IOException("Fail to get finished LocatedBlock " + + "from streamer, i=" + i); + } + ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock(); + long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes(); + if(block != null) { + block.setNumBytes(block.getNumBytes() + bytes); + } + } catch (InterruptedException ie) { + DFSClient.LOG.info("InterruptedException received when putting" + + " a block to stripeBlocks, ie = " + ie); + } + } + } + + lb = super.locateFollowingBlock(excludedNodes); + hasCommittedBlock = true; + LocatedBlock[] blocks = unwrapBlockGroup(lb); + assert blocks.length == blockGroupSize : + "Fail to get block group from namenode: blockGroupSize: " + + blockGroupSize + ", blocks.length: " + blocks.length; + lb = blocks[0]; + for (int i = 1; i < blocks.length; i++) { + try { + boolean offSuccess = stripedBlocks.get(i).offer(blocks[i], + 90, TimeUnit.SECONDS); + if(!offSuccess){ + String msg = "Fail to put block to stripeBlocks. i = " + i; + DFSClient.LOG.info(msg); + throw new IOException(msg); + } else { + DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i + + ", block: " + blocks[i]); + } + } catch (InterruptedException ie) { + DFSClient.LOG.info("InterruptedException received when putting" + + " a block to stripeBlocks, ie = " + ie); + } + } + } else { + try { + //wait 90 seconds to get a block from the queue + lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + DFSClient.LOG.info("InterruptedException received when retrieving " + + "a block from stripeBlocks, ie = " + ie); + } + } + return lb; + } + + /** + * Generate other blocks in a block group according to the first one. + * + * @param firstBlockInGroup the first block in a block group + * @return other blocks in this group + */ + public static LocatedBlock[] unwrapBlockGroup( + final LocatedBlock firstBlockInGroup) { + ExtendedBlock eb = firstBlockInGroup.getBlock(); + DatanodeInfo[] locs = firstBlockInGroup.getLocations(); + String[] storageIDs = firstBlockInGroup.getStorageIDs(); + StorageType[] storageTypes = firstBlockInGroup.getStorageTypes(); + Token blockToken = firstBlockInGroup.getBlockToken(); + LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length]; + for (int i = 0; i < blocksInGroup.length; i++) { + //each block in a group has the same number of bytes and timestamp + ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(), + eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp()); + blocksInGroup[i] = new LocatedBlock(extendedBlock, + new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]}, + new StorageType[] {storageTypes[i]}); + blocksInGroup[i].setBlockToken(blockToken); + } + return blocksInGroup; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/edf44cf4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java new file mode 100644 index 0000000..f5a37f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -0,0 +1,311 @@ +package org.apache.hadoop.hdfs; + +import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +public class TestDFSStripedOutputStream { + private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + int blockSize = 8 * 1024 * 1024; + int cellsInBlock = blockSize / cellSize; + private int mod = 29; + + @Before + public void setup() throws IOException { + int numDNs = dataBlocks + parityBlocks + 2; + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/"); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void TestFileEmpty() throws IOException { + testOneFile("/EmptyFile", 0); + } + + @Test + public void TestFileSmallerThanOneCell1() throws IOException { + testOneFile("/SmallerThanOneCell", 1); + } + + @Test + public void TestFileSmallerThanOneCell2() throws IOException { + testOneFile("/SmallerThanOneCell", cellSize - 1); + } + + @Test + public void TestFileEqualsWithOneCell() throws IOException { + testOneFile("/EqualsWithOneCell", cellSize); + } + + @Test + public void TestFileSmallerThanOneStripe1() throws IOException { + testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1); + } + + @Test + public void TestFileSmallerThanOneStripe2() throws IOException { + testOneFile("/SmallerThanOneStripe", cellSize + 123); + } + + @Test + public void TestFileEqualsWithOneStripe() throws IOException { + testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks); + } + + @Test + public void TestFileMoreThanOneStripe1() throws IOException { + testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123); + } + + @Test + public void TestFileMoreThanOneStripe2() throws IOException { + testOneFile("/MoreThanOneStripe2", + cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) + + cellSize * dataBlocks + 123); + } + + @Test + public void TestFileFullBlockGroup() throws IOException { + testOneFile("/FullBlockGroup", blockSize * dataBlocks); + } + + //TODO: The following tests will pass after HDFS-8121 fixed +// @Test + public void TestFileMoreThanABlockGroup1() throws IOException { + testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123); + } + + // @Test + public void TestFileMoreThanABlockGroup2() throws IOException { + testOneFile("/MoreThanABlockGroup2", + blockSize * dataBlocks * 3 + + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks + + 123); + } + + private int stripeDataSize() { + return cellSize * dataBlocks; + } + + private byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + private byte getByte(long pos) { + return (byte) (pos % mod + 1); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) + throws IOException { + Path TestPath = new Path(src); + byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); + + //check file length + FileStatus status = fs.getFileStatus(TestPath); + long fileLength = status.getLen(); + if (fileLength != writeBytes) { + Assert.fail("File Length error: expect=" + writeBytes + + ", actual=" + fileLength); + } + + DFSStripedInputStream dis = new DFSStripedInputStream( + fs.getClient(), src, true); + byte[] buf = new byte[writeBytes + 100]; + int readLen = dis.read(0, buf, 0, buf.length); + readLen = readLen >= 0 ? readLen : 0; + if (readLen != writeBytes) { + Assert.fail("The length of file is not correct."); + } + + for (int i = 0; i < writeBytes; i++) { + if (getByte(i) != buf[i]) { + Assert.fail("Byte at i = " + i + " is wrongly written."); + } + } + } + + private void testOneFile(String src, int writeBytes) + throws IOException { + Path TestPath = new Path(src); + + int allBlocks = dataBlocks + parityBlocks; + byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); + + //check file length + FileStatus status = fs.getFileStatus(TestPath); + long fileLength = status.getLen(); + if (fileLength != writeBytes) { + Assert.fail("File Length error: expect=" + writeBytes + + ", actual=" + fileLength); + } + + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); + + for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { + LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock); + List oneGroup = Arrays.asList(blocks); + blockGroupList.add(oneGroup); + } + + //test each block group + for (int group = 0; group < blockGroupList.size(); group++) { + //get the data of this block + List blockList = blockGroupList.get(group); + byte[][] dataBlockBytes = new byte[dataBlocks][]; + byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][]; + + //calculate the size of this block group + int lenOfBlockGroup = group < blockGroupList.size() - 1 ? + blockSize * dataBlocks : + writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks; + int intactStripes = lenOfBlockGroup / stripeDataSize(); + int lastStripeLen = lenOfBlockGroup % stripeDataSize(); + + //for each block, use BlockReader to read data + for (int i = 0; i < blockList.size(); i++) { + LocatedBlock lblock = blockList.get(i); + if (lblock == null) { + continue; + } + DatanodeInfo[] nodes = lblock.getLocations(); + ExtendedBlock block = lblock.getBlock(); + InetSocketAddress targetAddr = NetUtils.createSocketAddr( + nodes[0].getXferAddr()); + + int lenOfCell = cellSize; + if (i == lastStripeLen / cellSize) { + lenOfCell = lastStripeLen % cellSize; + } else if (i > lastStripeLen / cellSize) { + lenOfCell = 0; + } + int lenOfBlock = cellSize * intactStripes + lenOfCell; + byte[] blockBytes = new byte[lenOfBlock]; + if (i < dataBlocks) { + dataBlockBytes[i] = blockBytes; + } else { + parityBlockBytes[i - dataBlocks] = blockBytes; + } + + if (lenOfBlock == 0) { + continue; + } + + block.setNumBytes(lenOfBlock); + BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + setFileName(src). + setBlock(block). + setBlockToken(lblock.getBlockToken()). + setInetSocketAddress(targetAddr). + setStartOffset(0). + setLength(block.getNumBytes()). + setVerifyChecksum(true). + setClientName("TestStripeLayoutWrite"). + setDatanodeInfo(nodes[0]). + setCachingStrategy(CachingStrategy.newDefaultStrategy()). + setClientCacheContext(ClientContext.getFromConf(conf)). + setConfiguration(conf). + setRemotePeerFactory(new RemotePeerFactory() { + @Override + public Peer newConnectedPeer(InetSocketAddress addr, + Token blockToken, + DatanodeID datanodeId) + throws IOException { + Peer peer = null; + Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + try { + sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); + peer = TcpPeerServer.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeSocket(sock); + } + } + return peer; + } + }).build(); + + blockReader.readAll(blockBytes, 0, lenOfBlock); + blockReader.close(); + } + + //check if we write the data correctly + for (int i = 0; i < dataBlockBytes.length; i++) { + byte[] cells = dataBlockBytes[i]; + if (cells == null) { + continue; + } + for (int j = 0; j < cells.length; j++) { + byte expected; + //calculate the postion of this byte in the file + long pos = group * dataBlocks * blockSize + + (i * cellSize + j / cellSize * cellSize * dataBlocks) + + j % cellSize; + if (pos >= writeBytes) { + expected = 0; + } else { + expected = getByte(pos); + } + + if (expected != cells[j]) { + Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected + + ". Block group index is " + group + + ", stripe index is " + j / cellSize + + ", cell index is " + i + ", byte index is " + j % cellSize); + } + } + } + } + } + +}