hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Lilley <john.lil...@redpoint.net>
Subject RE: What is the recommended way to append to files on hdfs?
Date Tue, 31 May 2016 13:10:31 GMT
Dmitry,

Regarding your RemoteException issue, you should check that your output files are explicitly
closed by calling the close() method, otherwise they are only closed when the GC gets around
to finalizing.  If you can use try-with-resources, that is best.  I’ve seen these kinds
of issues when one writer still has the file open and another writer attempts to append.

More generally, I recommend keeping the same writers open for longer times, especially if
the writes tend to be small.  If corresponding readers need to see the appended data quickly
(I have not tried this myself) the FSDataOutputStream.hflush() method is documented to make
pending data available to readers.

You should rethink your design here:
“make sure the list of writers doesn't grow unbounded (currently, it's one writer per each
input file processed by the pipeliner)”
This doesn’t sound like a good design, coupling your input readers directly with output
writers.  Instead, put the writers in separate threads and push byte arrays to be written
to them via a queue.

John Lilley

From: Dmitry Goldenberg [mailto:dgoldenberg123@gmail.com]
Sent: Wednesday, May 25, 2016 9:12 PM
To: user@hadoop.apache.org
Subject: What is the recommended way to append to files on hdfs?


I'm having trouble figuring out a safe way to append to files in HDFS.

I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our process is a data
pipeliner which is multi-threaded (8 threads) and it has a stage which appends lines of delimited
text to files in a dedicated directory on HDFS. I'm using locks to synchronize access of the
threads to the buffered writers which append the data.

My first issue is deciding on the approach generally.

Approach A is to open the file, append to it, then close it for every line appended. This
seems slow and would seem to create too many small blocks, or at least I see some such sentiment
in various posts.

Approach B is to cache the writers but periodically refresh them to make sure the list of
writers doesn't grow unbounded (currently, it's one writer per each input file processed by
the pipeliner). This seems like a more efficient approach but I imagine having open streams
over a period of time however controlled may be an issue, especially for output file readers
(?)

Beyond this, my real issues are two. I am using the FileSystem Java Hadoop API to do the appending
and am intermittently getting these 2 types of exceptions:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for
DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is
trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540
does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY,
primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW],
ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

Anyone have any ideas on either of those?

For the first problem, I've tried instrumenting logic discussed in this post<http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files>
but didn't seem to help.

I'm also interested in the role of the dfs.support.append property, if at all applicable.

The code is more or less as follows, for getting the output stream
userGroupInfo = UserGroupInformation.createRemoteUser("hdfs");
Configuration conf = new Configuration();
conf.set(key1, val1);
....
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() {
        public FileSystem run() throws Exception {
            return FileSystem.get(conf);
        }
      });
org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {
    OutputStream os = null;
    synchronized (file) {
      // If the file exists
      if (isFile()) {
        // See if we're to append or to overwrite
        os = (append)
          ? fs.append(file) : fs.create(file, true);
      }
      // Appending to a non-existent file
      else if (append) {
        // Create the file first
        // otherwise, "failed to append to non-existent file" exception
        FSDataOutputStream dos = fs.create(file);
        dos.close();
        // Open it for appending
        os = fs.append(file);
      }
      // Creating a new file
      else {
        os = fs.create(file);
      }
    }
    return os;
  }






Mime
View raw message