hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject What is the recommended way to append to files on hdfs?
Date Thu, 26 May 2016 03:11:45 GMT
I'm having trouble figuring out a safe way to append to files in HDFS.

I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our
process is a data pipeliner which is multi-threaded (8 threads) and it has
a stage which appends lines of delimited text to files in a dedicated
directory on HDFS. I'm using locks to synchronize access of the threads to
the buffered writers which append the data.

My first issue is deciding on the approach generally.

Approach A is to open the file, append to it, then close it for every line
appended. This seems slow and would seem to create too many small blocks,
or at least I see some such sentiment in various posts.

Approach B is to cache the writers but periodically refresh them to make
sure the list of writers doesn't grow unbounded (currently, it's one writer
per each input file processed by the pipeliner). This seems like a more
efficient approach but I imagine having open streams over a period of time
however controlled may be an issue, especially for output file readers (?)

Beyond this, my real issues are two. I am using the FileSystem Java Hadoop
API to do the appending and am intermittently getting these 2 types of

org.apache.hadoop.ipc.RemoteException: failed to create file
/output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for
client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not
exist or is not under Constructionblk_1073760252_545
40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1,

Anyone have any ideas on either of those?

For the first problem, I've tried instrumenting logic discussed in this post
didn't seem to help.

I'm also interested in the role of the dfs.support.append property, if at
all applicable.

The code is more or less as follows, for getting the output stream

userGroupInfo = UserGroupInformation.*createRemoteUser*("hdfs");

Configuration conf = new Configuration();

conf.set(key1, val1);


conf.set(keyN, valN);

fileSystem = userGroupInfo.doAs(*new* PrivilegedExceptionAction<FileSystem>()

        *public* FileSystem run() *throws* Exception {

            *return* FileSystem.*get*(conf);



*org.apache.hadoop.fs.path.Path file = ...*

*public* OutputStream getOutputStream(*boolean* append) *throws* IOException

    OutputStream os = *null*;

    *synchronized* (file) {

      // If the file exists

      *if* (isFile()) {

        // See if we're to append or to overwrite

        os = (append)

          ? fs.append(file) : fs.create(file, *true*);


      // Appending to a non-existent file

      *else* *if* (append) {

        // Create the file first

        // otherwise, "failed to append to non-existent file" exception

        FSDataOutputStream dos = fs.create(file);


        // Open it for appending

        os = fs.append(file);


      // Creating a new file

      *else* {

        os = fs.create(file);



    *return* os;


View raw message