hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject Re: What is the recommended way to append to files on hdfs?
Date Tue, 31 May 2016 15:46:56 GMT
Thanks, John.

Perhaps I didn't write up that part clearly but we don't have input readers
coupled. The data is generated line by line, then directed to threads which
append lines to the files.  Originally, I started with threads not being
dedicated to files.  I've come across
https://issues.apache.org/jira/browse/HDFS-7203 and switched the
implementation to have a single thread executor service with a blocking
queue, one such thing per output file.  This was an attempt to work
around HDFS-7203 so multiple threads don't write to the same file but only
one dedicated thread does.

However, that approach only worked for a smaller data set. With larger data
sets, the "current leaseholder is trying to recreate file" errors came back.

The logic indeed is using the try-with-resources so presumably that should
be OK:

try (BufferedWriter bw = new BufferedWriter(new
OutputStreamWriter(outputFile.getOuputStream(true)))) {
    bw.write(line);
} catch (Exception ex) {
   // ... handle exception ...
}

I included the getOutputStream method in my previous email; including it
below.

Basically, having a single thread dedicated service with a queue didn't
work for me.  I'm trying to have just a single thread handling all appends
now, inefficient as that may be.  We currently have no path to upgrade to
Hadoop 2.6.0 to see if we get better luck with HDFS-7203 having been
fixed...

*org.apache.hadoop.fs.path.Path file = ...*

*public* OutputStream getOutputStream(*boolean* append) *throws* IOException
{

    OutputStream os = *null*;

    *synchronized* (file) {

      // If the file exists

      *if* (isFile()) {

        // See if we're to append or to overwrite

        os = (append)

          ? fs.append(file) : fs.create(file, *true*);

      }

      // Appending to a non-existent file

      *else* *if* (append) {

        // Create the file first

        // otherwise, "failed to append to non-existent file" exception

        FSDataOutputStream dos = fs.create(file);

        dos.close();

        // Open it for appending

        os = fs.append(file);

      }

      // Creating a new file

      *else* {

        os = fs.create(file);

      }

    }

    *return* os;

  }


On Tue, May 31, 2016 at 9:10 AM, John Lilley <john.lilley@redpoint.net>
wrote:

> Dmitry,
>
>
>
> Regarding your RemoteException issue, you should check that your output
> files are explicitly closed by calling the close() method, otherwise they
> are only closed when the GC gets around to finalizing.  If you can use
> try-with-resources, that is best.  I’ve seen these kinds of issues when one
> writer still has the file open and another writer attempts to append.
>
>
>
> More generally, I recommend keeping the same writers open for longer
> times, especially if the writes tend to be small.  If corresponding readers
> need to see the appended data quickly (I have not tried this myself) the
> FSDataOutputStream.hflush() method is documented to make pending data
> available to readers.
>
>
>
> You should rethink your design here:
>
> “make sure the list of writers doesn't grow unbounded (currently, it's one
> writer per each input file processed by the pipeliner)”
>
> This doesn’t sound like a good design, coupling your input readers
> directly with output writers.  Instead, put the writers in separate threads
> and push byte arrays to be written to them via a queue.
>
>
>
> *John Lilley*
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg123@gmail.com]
> *Sent:* Wednesday, May 25, 2016 9:12 PM
> *To:* user@hadoop.apache.org
> *Subject:* What is the recommended way to append to files on hdfs?
>
>
>
> I'm having trouble figuring out a safe way to append to files in HDFS.
>
> I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our
> process is a data pipeliner which is multi-threaded (8 threads) and it has
> a stage which appends lines of delimited text to files in a dedicated
> directory on HDFS. I'm using locks to synchronize access of the threads to
> the buffered writers which append the data.
>
> My first issue is deciding on the approach generally.
>
> Approach A is to open the file, append to it, then close it for every line
> appended. This seems slow and would seem to create too many small blocks,
> or at least I see some such sentiment in various posts.
>
> Approach B is to cache the writers but periodically refresh them to make
> sure the list of writers doesn't grow unbounded (currently, it's one writer
> per each input file processed by the pipeliner). This seems like a more
> efficient approach but I imagine having open streams over a period of time
> however controlled may be an issue, especially for output file readers (?)
>
> Beyond this, my real issues are two. I am using the FileSystem Java Hadoop
> API to do the appending and am intermittently getting these 2 types of
> exceptions:
>
> org.apache.hadoop.ipc.RemoteException: failed to create file
> /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for
> client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.
>
> org.apache.hadoop.ipc.RemoteException:
> BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not
> exist or is not under Constructionblk_1073760252_545
> 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1,
> replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW],
> ReplicaUnderConst
> ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}
>
> Anyone have any ideas on either of those?
>
> For the first problem, I've tried instrumenting logic discussed in this
> post
> <http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files>
but
> didn't seem to help.
>
> I'm also interested in the role of the dfs.support.append property, if at
> all applicable.
>
> The code is more or less as follows, for getting the output stream
>
> userGroupInfo = UserGroupInformation.*createRemoteUser*("hdfs");
>
> Configuration conf = new Configuration();
>
> conf.set(key1, val1);
>
> ....
>
> conf.set(keyN, valN);
>
> fileSystem = userGroupInfo.doAs(*new* PrivilegedExceptionAction<FileSystem>()
> {
>
>         *public* FileSystem run() *throws* Exception {
>
>             *return* FileSystem.*get*(conf);
>
>         }
>
>       });
>
> *org.apache.hadoop.fs.path.Path file = ...*
>
> *public* OutputStream getOutputStream(*boolean* append) *throws* IOException
> {
>
>     OutputStream os = *null*;
>
>     *synchronized* (file) {
>
>       // If the file exists
>
>       *if* (isFile()) {
>
>         // See if we're to append or to overwrite
>
>         os = (append)
>
>           ? fs.append(file) : fs.create(file, *true*);
>
>       }
>
>       // Appending to a non-existent file
>
>       *else* *if* (append) {
>
>         // Create the file first
>
>         // otherwise, "failed to append to non-existent file" exception
>
>         FSDataOutputStream dos = fs.create(file);
>
>         dos.close();
>
>         // Open it for appending
>
>         os = fs.append(file);
>
>       }
>
>       // Creating a new file
>
>       *else* {
>
>         os = fs.create(file);
>
>       }
>
>     }
>
>     *return* os;
>
>   }
>
>
>
>
>
>
>

Mime
View raw message