hadoop-hdfs-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sirianni, Eric" <Eric.Siria...@netapp.com>
Subject RE: mapred replication
Date Mon, 19 Aug 2013 18:01:57 GMT
I should have mentioned that the IOException is from the DataNode (if not obvious from the
stacktrace :)).  The DFSClient continues along happily by fetching the block from a different
replica (as one would expect).

-----Original Message-----
From: Sirianni, Eric [mailto:Eric.Sirianni@netapp.com] 
Sent: Monday, August 19, 2013 10:37 AM
To: hdfs-dev@hadoop.apache.org
Subject: RE: mapred replication

Thanks Chris and others for the detailed explanation.

I was aware of the basic rationale behind having a higher replication factor for mapred job
files - thanks taking the time to elaborate and share with those on this list.

After thinking about it a bit more offline, I too speculated that the choice of altering the
rep factor post file-create was to limit the size of the client write pipeline and use background
inter-datanode replication to create the additional replicas.  Thanks for confirming that

Regarding the IOExceptions, here is the stack trace in question:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(,
id=DS-1611001133, infoPort=50075, ipcPort=50020):Got exception while serving blk_7363978388743975861_1030
to /
java.io.IOException: Block blk_7363978388743975861_1030 is not valid.
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getBlockFile(FSDataset.java:1059)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getLength(FSDataset.java:1022)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getVisibleLength(FSDataset.java:1032)
        at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:115)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:194)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:104)
        at java.lang.Thread.run(Thread.java:679)

I also speculated that this is due to the way that invalidates are processed asynchronously
at the namenode.  A quick look at the chooseExcessReplicates()->addToInvalidates() path
seems to indicate that the NameNode does not actually remove the pruned replica from the BlocksMap
until the subsequent blockReport is received.  This can leave a substantial window where the
NameNode can return bogus replica locations to clients.

There is another code path FSNamesystem.invalidateBlock() that does proactively update the
BlocksMap (via FSNamesystem.removeStoredBlock()) after updating the recentInvalidateSets.
 Perhaps the excess replica pruning path should include such a blockmap update as well?  Or
even better, just push the common BlocksMap removal into the addToInvalidates() method to
all callers get that behavior.  

Maybe I'm missing something - is there a legitimate reason for the NameNode to keep a replica's
metadata in the BlocksMap after it has already decided to invalidate said replica?


-----Original Message-----
From: Robert Evans [mailto:evans@yahoo-inc.com] 
Sent: Monday, August 19, 2013 10:11 AM
To: hdfs-dev@hadoop.apache.org
Subject: Re: mapred replication

Without the stack trace of the exceptions it is hard to tell.  The pruning
is asynchronous, but so is a node crashing with a replica on it.  The
client is supposed to detect this situation and find a new replica that
works.  I am not that familiar with the code, but I believe in some if not
all of these cases it will log the exception to indicate that something
bad happened, but it recovered.


On 8/16/13 4:40 PM, "Jay Vyas" <jayunit100@gmail.com> wrote:

>Why should this lead to an IOException?  Is it because the pruning of
>replicas is asynchronous and the datanodes try to access nonexistent
>files?  If so that seems like a pretty major bug
>On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth
>> Hi Eric,
>> Yes, this is intentional.  The job.xml file and the job jar file get
>> from every node running a map or reduce task.  Because of this, using a
>> higher than normal replication factor on these files improves locality.
>>  More than 3 task slots will have access to local replicas.  These files
>> tend to be much smaller than the actual data read by a job, so there
>> to be little harm done in terms of disk space consumption.
>> Why not create the file initially with 10 replicas instead of creating
>> with 3 and then dialing up?  I imagine this is so that job submission
>> doesn't block on a synchronous write to a long pipeline.  The extra
>> replicas aren't necessary for correctness, and a long-running job will
>> the locality benefits in the long term once more replicas are created in
>> the background.
>> I recommend submitting a new jira describing the problem that you saw.
>> probably can handle this better, and a jira would be a good place to
>> discuss the trade-offs.  A few possibilities:
>> Log a warning if mapred.submit.replication < dfs.replication.
>> Skip resetting replication if mapred.submit.replication <=
>> Fail with error if mapred.submit.replication < dfs.replication.
>> Chris Nauroth
>> Hortonworks
>> http://hortonworks.com/
>> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric
>> >wrote:
>> > In debugging some replication issues in our HDFS environment, I
>> > that the MapReduce framework uses the following algorithm for setting
>> > replication on submitted job files:
>> >
>> > 1.     Create the file with *default* DFS replication factor (i.e.
>> > 'dfs.replication')
>> >
>> > 2.     Subsequently alter the replication of the file based on the
>> > 'mapred.submit.replication' config value
>> >
>> >   private static FSDataOutputStream createFile(FileSystem fs, Path
>> > splitFile,
>> >       Configuration job)  throws IOException {
>> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>> >     int replication = job.getInt("mapred.submit.replication", 10);
>> >     fs.setReplication(splitFile, (short)replication);
>> >     writeSplitHeader(out);
>> >     return out;
>> >   }
>> >
>> > If I understand currectly, the net functional effect of this approach
>> > that
>> >
>> > -       The initial write pipeline is setup with 'dfs.replication'
>> > (i.e. 3)
>> >
>> > -       The namenode triggers additional inter-datanode replications
>> > the background (as it detects the blocks as "under-replicated").
>> >
>> > I'm assuming this is intentional?  Alternatively, if the
>> > mapred.submit.replication was specified on initial create, the write
>> > pipeline would be significantly larger.
>> >
>> > The reason I noticed is that we had inadvertently specified
>> > mapred.submit.replication as *less than* dfs.replication in our
>> > configuration, which caused a bunch of excess replica pruning (and
>> > ultimately IOExceptions in our datanode logs).
>> >
>> > Thanks,
>> > Eric
>> >
>> >
>> --
>> NOTICE: This message is intended for the use of the individual or
>>entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the
>> of this message is not the intended recipient, you are hereby notified
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>> and delete it from your system. Thank You.
>Jay Vyas

View raw message