impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [20/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env.h b/be/src/kudu/util/env.h
new file mode 100644
index 0000000..7f06c4e
--- /dev/null
+++ b/be/src/kudu/util/env.h
@@ -0,0 +1,643 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// An Env is an interface used by the kudu implementation to access
+// operating system functionality like the filesystem etc.  Callers
+// may wish to provide a custom Env object when opening a database to
+// get fine gain control; e.g., to rate limit file system operations.
+//
+// All Env implementations are safe for concurrent access from
+// multiple threads without any external synchronization.
+
+#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_
+#define STORAGE_LEVELDB_INCLUDE_ENV_H_
+
+#include <cstdarg>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/callback_forward.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class faststring;
+class FileLock;
+class RandomAccessFile;
+class RWFile;
+class SequentialFile;
+class Slice;
+class WritableFile;
+
+struct RandomAccessFileOptions;
+struct RWFileOptions;
+struct WritableFileOptions;
+
+// Returned by Env::GetSpaceInfo().
+struct SpaceInfo {
+  int64_t capacity_bytes; // Capacity of a filesystem, in bytes.
+  int64_t free_bytes;     // Bytes available to non-privileged processes.
+};
+
+class Env {
+ public:
+  // Governs if/how the file is created.
+  //
+  // enum value                      | file exists       | file does not exist
+  // --------------------------------+-------------------+--------------------
+  // CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates
+  // CREATE_NON_EXISTING             | fails             | creates
+  // OPEN_EXISTING                   | opens             | fails
+  enum CreateMode {
+    CREATE_IF_NON_EXISTING_TRUNCATE,
+    CREATE_NON_EXISTING,
+    OPEN_EXISTING
+  };
+
+  Env() { }
+  virtual ~Env();
+
+  // Return a default environment suitable for the current operating
+  // system.  Sophisticated users may wish to provide their own Env
+  // implementation instead of relying on this default environment.
+  //
+  // The result of Default() belongs to kudu and must never be deleted.
+  static Env* Default();
+
+  // Create a brand new sequentially-readable file with the specified name.
+  // On success, stores a pointer to the new file in *result and returns OK.
+  // On failure stores NULL in *result and returns non-OK.  If the file does
+  // not exist, returns a non-OK status.
+  //
+  // The returned file will only be accessed by one thread at a time.
+  virtual Status NewSequentialFile(const std::string& fname,
+                                   std::unique_ptr<SequentialFile>* result) = 0;
+
+  // Create a brand new random access read-only file with the
+  // specified name.  On success, stores a pointer to the new file in
+  // *result and returns OK.  On failure stores NULL in *result and
+  // returns non-OK.  If the file does not exist, returns a non-OK
+  // status.
+  //
+  // The returned file may be concurrently accessed by multiple threads.
+  virtual Status NewRandomAccessFile(const std::string& fname,
+                                     std::unique_ptr<RandomAccessFile>* result) = 0;
+
+  // Like the previous NewRandomAccessFile, but allows options to be specified.
+  virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts,
+                                     const std::string& fname,
+                                     std::unique_ptr<RandomAccessFile>* result) = 0;
+
+  // Create an object that writes to a new file with the specified
+  // name.  Deletes any existing file with the same name and creates a
+  // new file.  On success, stores a pointer to the new file in
+  // *result and returns OK.  On failure stores NULL in *result and
+  // returns non-OK.
+  //
+  // The returned file will only be accessed by one thread at a time.
+  virtual Status NewWritableFile(const std::string& fname,
+                                 std::unique_ptr<WritableFile>* result) = 0;
+
+
+  // Like the previous NewWritableFile, but allows options to be
+  // specified.
+  virtual Status NewWritableFile(const WritableFileOptions& opts,
+                                 const std::string& fname,
+                                 std::unique_ptr<WritableFile>* result) = 0;
+
+  // Creates a new WritableFile provided the name_template parameter.
+  // The last six characters of name_template must be "XXXXXX" and these are
+  // replaced with a string that makes the filename unique.
+  // The resulting created filename, if successful, will be stored in the
+  // created_filename out parameter.
+  // The file is created with permissions 0600, that is, read plus write for
+  // owner only. The implementation will create the file in a secure manner,
+  // and will return an error Status if it is unable to open the file.
+  virtual Status NewTempWritableFile(const WritableFileOptions& opts,
+                                     const std::string& name_template,
+                                     std::string* created_filename,
+                                     std::unique_ptr<WritableFile>* result) = 0;
+
+  // Creates a new readable and writable file. If a file with the same name
+  // already exists on disk, it is deleted.
+  //
+  // Some of the methods of the new file may be accessed concurrently,
+  // while others are only safe for access by one thread at a time.
+  virtual Status NewRWFile(const std::string& fname,
+                           std::unique_ptr<RWFile>* result) = 0;
+
+  // Like the previous NewRWFile, but allows options to be specified.
+  virtual Status NewRWFile(const RWFileOptions& opts,
+                           const std::string& fname,
+                           std::unique_ptr<RWFile>* result) = 0;
+
+  // Same as abovoe for NewTempWritableFile(), but for an RWFile.
+  virtual Status NewTempRWFile(const RWFileOptions& opts,
+                               const std::string& name_template,
+                               std::string* created_filename,
+                               std::unique_ptr<RWFile>* res) = 0;
+
+  // Returns true iff the named file exists.
+  virtual bool FileExists(const std::string& fname) = 0;
+
+  // Store in *result the names of the children of the specified directory.
+  // The names are relative to "dir".
+  // Original contents of *results are dropped.
+  virtual Status GetChildren(const std::string& dir,
+                             std::vector<std::string>* result) = 0;
+
+  // Delete the named file.
+  virtual Status DeleteFile(const std::string& fname) = 0;
+
+  // Create the specified directory.
+  virtual Status CreateDir(const std::string& dirname) = 0;
+
+  // Delete the specified directory.
+  virtual Status DeleteDir(const std::string& dirname) = 0;
+
+  // Return the current working directory.
+  virtual Status GetCurrentWorkingDir(std::string* cwd) const = 0;
+
+  // Change the current working directory.
+  virtual Status ChangeDir(const std::string& dest) = 0;
+
+  // Synchronize the entry for a specific directory.
+  virtual Status SyncDir(const std::string& dirname) = 0;
+
+  // Recursively delete the specified directory.
+  // This should operate safely, not following any symlinks, etc.
+  virtual Status DeleteRecursively(const std::string &dirname) = 0;
+
+  // Store the logical size of fname in *file_size.
+  virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
+
+  // Store the physical size of fname in *file_size.
+  //
+  // This differs from GetFileSize() in that it returns the actual amount
+  // of space consumed by the file, not the user-facing file size.
+  virtual Status GetFileSizeOnDisk(const std::string& fname, uint64_t* file_size) = 0;
+
+  // Walk 'root' recursively, looking up the amount of space used by each file
+  // as reported by GetFileSizeOnDisk(), storing the grand total in 'bytes_used'.
+  virtual Status GetFileSizeOnDiskRecursively(const std::string& root, uint64_t* bytes_used) = 0;
+
+  // Returns the modified time of the file in microseconds.
+  //
+  // The timestamp is a 'system' timestamp, and is not guaranteed to be
+  // monotonic, or have any other consistency properties. The granularity of the
+  // timestamp is not guaranteed, and may be as high as 1 second on some
+  // platforms. The timestamp is not guaranteed to be anchored to any particular
+  // epoch.
+  virtual Status GetFileModifiedTime(const std::string& fname, int64_t* timestamp) = 0;
+
+  // Store the block size of the filesystem where fname resides in
+  // *block_size. fname must exist but it may be a file or a directory.
+  virtual Status GetBlockSize(const std::string& fname, uint64_t* block_size) = 0;
+
+  // Determine the capacity and number of bytes free on the filesystem
+  // specified by 'path'. "Free space" accounting on the underlying filesystem
+  // may be more coarse than single bytes.
+  virtual Status GetSpaceInfo(const std::string& path, SpaceInfo* space_info) = 0;
+
+  // Rename file src to target.
+  virtual Status RenameFile(const std::string& src,
+                            const std::string& target) = 0;
+
+  // Lock the specified file.  Used to prevent concurrent access to
+  // the same db by multiple processes.  On failure, stores NULL in
+  // *lock and returns non-OK.
+  //
+  // On success, stores a pointer to the object that represents the
+  // acquired lock in *lock and returns OK.  The caller should call
+  // UnlockFile(*lock) to release the lock.  If the process exits,
+  // the lock will be automatically released.
+  //
+  // If somebody else already holds the lock, finishes immediately
+  // with a failure.  I.e., this call does not wait for existing locks
+  // to go away.
+  //
+  // May create the named file if it does not already exist.
+  virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
+
+  // Release the lock acquired by a previous successful call to LockFile.
+  // REQUIRES: lock was returned by a successful LockFile() call
+  // REQUIRES: lock has not already been unlocked.
+  virtual Status UnlockFile(FileLock* lock) = 0;
+
+  // *path is set to a temporary directory that can be used for testing. It may
+  // or many not have just been created. The directory may or may not differ
+  // between runs of the same process, but subsequent calls will return the
+  // same directory.
+  virtual Status GetTestDirectory(std::string* path) = 0;
+
+  // Returns the number of micro-seconds since some fixed point in time. Only
+  // useful for computing deltas of time.
+  virtual uint64_t NowMicros() = 0;
+
+  // Sleep/delay the thread for the perscribed number of micro-seconds.
+  virtual void SleepForMicroseconds(int micros) = 0;
+
+  // Get caller's thread id.
+  virtual uint64_t gettid() = 0;
+
+  // Return the full path of the currently running executable.
+  virtual Status GetExecutablePath(std::string* path) = 0;
+
+  // Checks if the file is a directory. Returns an error if it doesn't
+  // exist, otherwise writes true or false into 'is_dir' appropriately.
+  virtual Status IsDirectory(const std::string& path, bool* is_dir) = 0;
+
+  // The kind of file found during a walk. Note that symbolic links are
+  // reported as FILE_TYPE.
+  enum FileType {
+    DIRECTORY_TYPE,
+    FILE_TYPE,
+  };
+
+  // Called for each file/directory in the walk.
+  //
+  // The first argument is the type of file.
+  // The second is the dirname of the file.
+  // The third is the basename of the file.
+  //
+  // Returning an error won't halt the walk, but it will cause it to return
+  // with an error status when it's done.
+  typedef Callback<Status(FileType, const std::string&, const std::string&)> WalkCallback;
+
+  // Whether to walk directories in pre-order or post-order.
+  enum DirectoryOrder {
+    PRE_ORDER,
+    POST_ORDER,
+  };
+
+  // Walk the filesystem subtree from 'root' down, invoking 'cb' for each
+  // file or directory found, including 'root'.
+  //
+  // The walk will not cross filesystem boundaries. It won't change the
+  // working directory, nor will it follow symbolic links.
+  virtual Status Walk(const std::string& root,
+                      DirectoryOrder order,
+                      const WalkCallback& cb) = 0;
+
+  // Finds paths on the filesystem matching a pattern.
+  //
+  // The found pathnames are added to the 'paths' vector. If no pathnames are
+  // found matching the pattern, no paths are added to the vector and an OK
+  // status is returned.
+  virtual Status Glob(const std::string& path_pattern, std::vector<std::string>* paths) = 0;
+
+  // Canonicalize 'path' by applying the following conversions:
+  // - Converts a relative path into an absolute one using the cwd.
+  // - Converts '.' and '..' references.
+  // - Resolves all symbolic links.
+  //
+  // All directory entries in 'path' must exist on the filesystem.
+  virtual Status Canonicalize(const std::string& path, std::string* result) = 0;
+
+  // Get the total amount of RAM installed on this machine.
+  virtual Status GetTotalRAMBytes(int64_t* ram) = 0;
+
+  // Get the max number of file descriptors that this process can open.
+  virtual int64_t GetOpenFileLimit() = 0;
+
+  // Increase the max number of file descriptors that this process can open as
+  // much as possible. On UNIX platforms, this means increasing the
+  // RLIMIT_NOFILE resource soft limit (the limit actually enforced by the
+  // kernel) to be equal to the hard limit.
+  virtual void IncreaseOpenFileLimit() = 0;
+
+  // Checks whether the given path resides on an ext2, ext3, or ext4
+  // filesystem.
+  //
+  // On success, 'result' contains the answer. On failure, 'result' is unset.
+  virtual Status IsOnExtFilesystem(const std::string& path, bool* result) = 0;
+
+  // Gets the kernel release string for this machine.
+  virtual std::string GetKernelRelease() = 0;
+
+  // Ensure that the file with the given path has permissions which adhere
+  // to the current configured umask (from flags.h). If the permissions are
+  // wider than the current umask, then a warning is logged and the permissions
+  // are fixed.
+  //
+  // Returns a bad Status if the file does not exist or the permissions cannot
+  // be changed.
+  virtual Status EnsureFileModeAdheresToUmask(const std::string& path) = 0;
+
+  // Special string injected into file-growing operations' random failures
+  // (if enabled).
+  //
+  // Only useful for tests.
+  static const char* const kInjectedFailureStatusMsg;
+
+ private:
+  // No copying allowed
+  Env(const Env&);
+  void operator=(const Env&);
+};
+
+// A file abstraction for reading sequentially through a file
+class SequentialFile {
+ public:
+  SequentialFile() { }
+  virtual ~SequentialFile();
+
+  // Read up to "result.size" bytes from the file.
+  // Sets "result.data" to the data that was read.
+  // If an error was encountered, returns a non-OK status
+  // and the contents of "result" are invalid.
+  //
+  // REQUIRES: External synchronization
+  virtual Status Read(Slice* result) = 0;
+
+  // Skip "n" bytes from the file. This is guaranteed to be no
+  // slower that reading the same data, but may be faster.
+  //
+  // If end of file is reached, skipping will stop at the end of the
+  // file, and Skip will return OK.
+  //
+  // REQUIRES: External synchronization
+  virtual Status Skip(uint64_t n) = 0;
+
+  // Returns the filename provided when the SequentialFile was constructed.
+  virtual const std::string& filename() const = 0;
+};
+
+// A file abstraction for randomly reading the contents of a file.
+class RandomAccessFile {
+ public:
+  RandomAccessFile() { }
+  virtual ~RandomAccessFile();
+
+  // Read up to "result.size" from the file starting at "offset".
+  // Sets "result.data" to the data that was read.
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status Read(uint64_t offset, Slice* result) const = 0;
+
+  // Reads up to the "results" aggregate size, based on each Slice's "size",
+  // from the file starting at 'offset'.
+  // Sets each "result.data" to the data that was read.
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status ReadV(uint64_t offset, std::vector<Slice>* results) const = 0;
+
+  // Returns the size of the file
+  virtual Status Size(uint64_t *size) const = 0;
+
+  // Returns the filename provided when the RandomAccessFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+  // Returns the approximate memory usage of this RandomAccessFile including
+  // the object itself.
+  virtual size_t memory_footprint() const = 0;
+};
+
+// Creation-time options for WritableFile
+struct WritableFileOptions {
+  // Call Sync() during Close().
+  bool sync_on_close;
+
+  // See CreateMode for details.
+  Env::CreateMode mode;
+
+  WritableFileOptions()
+    : sync_on_close(false),
+      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+};
+
+// Options specified when a file is opened for random access.
+struct RandomAccessFileOptions {
+  RandomAccessFileOptions() {}
+};
+
+// A file abstraction for sequential writing.  The implementation
+// must provide buffering since callers may append small fragments
+// at a time to the file.
+class WritableFile {
+ public:
+  enum FlushMode {
+    FLUSH_SYNC,
+    FLUSH_ASYNC
+  };
+
+  WritableFile() { }
+  virtual ~WritableFile();
+
+  virtual Status Append(const Slice& data) = 0;
+
+  // If possible, uses scatter-gather I/O to efficiently append
+  // multiple buffers to a file. Otherwise, falls back to regular I/O.
+  //
+  // For implementation specific quirks and details, see comments in
+  // implementation source code (e.g., env_posix.cc)
+  virtual Status AppendV(const std::vector<Slice>& data) = 0;
+
+  // Pre-allocates 'size' bytes for the file in the underlying filesystem.
+  // size bytes are added to the current pre-allocated size or to the current
+  // offset, whichever is bigger. In no case is the file truncated by this
+  // operation.
+  //
+  // On some implementations, preallocation is done without initializing the
+  // contents of the data blocks (as opposed to writing zeroes), requiring no
+  // IO to the data blocks.
+  //
+  // In no case is the file truncated by this operation.
+  virtual Status PreAllocate(uint64_t size) = 0;
+
+  virtual Status Close() = 0;
+
+  // Flush all dirty data (not metadata) to disk.
+  //
+  // If the flush mode is synchronous, will wait for flush to finish and
+  // return a meaningful status.
+  virtual Status Flush(FlushMode mode) = 0;
+
+  virtual Status Sync() = 0;
+
+  virtual uint64_t Size() const = 0;
+
+  // Returns the filename provided when the WritableFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+ private:
+  // No copying allowed
+  WritableFile(const WritableFile&);
+  void operator=(const WritableFile&);
+};
+
+// Creation-time options for RWFile
+struct RWFileOptions {
+  // Call Sync() during Close().
+  bool sync_on_close;
+
+  // See CreateMode for details.
+  Env::CreateMode mode;
+
+  RWFileOptions()
+    : sync_on_close(false),
+      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+};
+
+// A file abstraction for both reading and writing. No notion of a built-in
+// file offset is ever used; instead, all operations must provide an
+// explicit offset.
+//
+// All operations are safe for concurrent use by multiple threads (unless
+// noted otherwise) bearing in mind the usual filesystem coherency guarantees
+// (e.g. two threads that write concurrently to the same file offset will
+// probably yield garbage).
+class RWFile {
+ public:
+  enum FlushMode {
+    FLUSH_SYNC,
+    FLUSH_ASYNC
+  };
+
+  RWFile() {
+  }
+
+  virtual ~RWFile();
+
+  // Read up to "result.size" from the file starting at "offset".
+  // Sets "result.data" to the data that was read.
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status Read(uint64_t offset, Slice* result) const = 0;
+
+  // Reads up to the "results" aggregate size, based on each Slice's "size",
+  // from the file starting at 'offset'.
+  // Sets each "result.data" to the data that was read.
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status ReadV(uint64_t offset, std::vector<Slice>* results) const = 0;
+
+  // Writes 'data' to the file position given by 'offset'.
+  virtual Status Write(uint64_t offset, const Slice& data) = 0;
+
+  // Writes the 'data' vector to the file position given by 'offset'.
+  virtual Status WriteV(uint64_t offset, const std::vector<Slice>& data) = 0;
+
+  // Preallocates 'length' bytes for the file in the underlying filesystem
+  // beginning at 'offset'. It is safe to preallocate the same range
+  // repeatedly; this is an idempotent operation.
+  //
+  // On some implementations, preallocation is done without initializing the
+  // contents of the data blocks (as opposed to writing zeroes), requiring no
+  // IO to the data blocks. On such implementations, this is much faster than
+  // using Truncate() to increase the file size.
+  //
+  // In no case is the file truncated by this operation.
+  //
+  // 'mode' controls whether the file's logical size grows to include the
+  // preallocated space, or whether it remains the same.
+  enum PreAllocateMode {
+    CHANGE_FILE_SIZE,
+    DONT_CHANGE_FILE_SIZE
+  };
+  virtual Status PreAllocate(uint64_t offset,
+                             size_t length,
+                             PreAllocateMode mode) = 0;
+
+  // Truncate the file. If 'new_size' is less than the previous file size, the
+  // extra data will be lost. If 'new_size' is greater than the previous file
+  // size, the file length is extended, and the extended portion will contain
+  // null bytes ('\0').
+  virtual Status Truncate(uint64_t length) = 0;
+
+  // Deallocates space given by 'offset' and length' from the file,
+  // effectively "punching a hole" in it. The space will be reclaimed by
+  // the filesystem and reads to that range will return zeroes. Useful
+  // for making whole files sparse.
+  //
+  // Filesystems that don't implement this will return an error.
+  virtual Status PunchHole(uint64_t offset, size_t length) = 0;
+
+  // Flushes the range of dirty data (not metadata) given by 'offset' and
+  // 'length' to disk. If length is 0, all bytes from 'offset' to the end
+  // of the file are flushed.
+  //
+  // If the flush mode is synchronous, will wait for flush to finish and
+  // return a meaningful status.
+  virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) = 0;
+
+  // Synchronously flushes all dirty file data and metadata to disk. Upon
+  // returning successfully, all previously issued file changes have been
+  // made durable.
+  virtual Status Sync() = 0;
+
+  // Closes the file, optionally calling Sync() on it if the file was
+  // created with the sync_on_close option enabled.
+  //
+  // Not thread-safe.
+  virtual Status Close() = 0;
+
+  // Retrieves the file's size.
+  virtual Status Size(uint64_t* size) const = 0;
+
+  // Retrieve a map of the file's live extents.
+  //
+  // Each map entry is an offset and size representing a section of live file
+  // data. Any byte offset not contained in a map entry implicitly belongs to a
+  // "hole" in the (sparse) file.
+  //
+  // If the underlying filesystem does not support extents, map entries
+  // represent runs of adjacent fixed-size filesystem blocks instead. If the
+  // platform doesn't support fetching extents at all, a NotSupported status
+  // will be returned.
+  typedef std::map<uint64_t, uint64_t> ExtentMap;
+  virtual Status GetExtentMap(ExtentMap* out) const = 0;
+
+  // Returns the filename provided when the RWFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(RWFile);
+};
+
+// Identifies a locked file.
+class FileLock {
+ public:
+  FileLock() { }
+  virtual ~FileLock();
+ private:
+  // No copying allowed
+  FileLock(const FileLock&);
+  void operator=(const FileLock&);
+};
+
+// A utility routine: write "data" to the named file.
+extern Status WriteStringToFile(Env* env, const Slice& data,
+                                const std::string& fname);
+
+// A utility routine: read contents of named file into *data
+extern Status ReadFileToString(Env* env, const std::string& fname,
+                               faststring* data);
+
+}  // namespace kudu
+
+#endif  // STORAGE_LEVELDB_INCLUDE_ENV_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env_posix.cc b/be/src/kudu/util/env_posix.cc
new file mode 100644
index 0000000..a3998fc
--- /dev/null
+++ b/be/src/kudu/util/env_posix.cc
@@ -0,0 +1,1608 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <fts.h>
+#include <glob.h>
+#include <limits.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <sys/resource.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/utsname.h>
+#include <unistd.h>
+
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <ctime>
+#include <memory>
+#include <numeric>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/trace.h"
+
+#if defined(__APPLE__)
+#include <mach-o/dyld.h>
+#include <sys/sysctl.h>
+#else
+#include <linux/falloc.h>
+#include <linux/fiemap.h>
+#include <linux/fs.h>
+#include <linux/magic.h>
+#include <sys/ioctl.h>
+#include <sys/sysinfo.h>
+#include <sys/vfs.h>
+#endif  // defined(__APPLE__)
+
+// Copied from falloc.h. Useful for older kernels that lack support for
+// hole punching; fallocate(2) will return EOPNOTSUPP.
+#ifndef FALLOC_FL_KEEP_SIZE
+#define FALLOC_FL_KEEP_SIZE 0x01 /* default is extend size */
+#endif
+#ifndef FALLOC_FL_PUNCH_HOLE
+#define FALLOC_FL_PUNCH_HOLE  0x02 /* de-allocates range */
+#endif
+
+// For platforms without fdatasync (like OS X)
+#ifndef fdatasync
+#define fdatasync fsync
+#endif
+
+// For platforms without unlocked_stdio (like OS X)
+#ifndef fread_unlocked
+#define fread_unlocked fread
+#endif
+
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+  static_assert(std::is_signed<decltype(err)>::value == true, \
+                #err " must be a signed integer"); \
+  (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
+// Same as the above, but for stream API calls like fread() and fwrite().
+#define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \
+  static_assert(std::is_unsigned<decltype(nread)>::value == true, \
+                #nread " must be an unsigned integer"); \
+  (nread) = (expr); \
+} while ((nread) == 0 && ferror(stream) == EINTR)
+
+// See KUDU-588 for details.
+DEFINE_bool(env_use_fsync, false,
+            "Use fsync(2) instead of fdatasync(2) for synchronizing dirty "
+            "data to disk.");
+TAG_FLAG(env_use_fsync, advanced);
+TAG_FLAG(env_use_fsync, evolving);
+
+DEFINE_bool(suicide_on_eio, true,
+            "Kill the process if an I/O operation results in EIO");
+TAG_FLAG(suicide_on_eio, advanced);
+
+DEFINE_bool(never_fsync, false,
+            "Never fsync() anything to disk. This is used by certain test cases to "
+            "speed up runtime. This is very unsafe to use in production.");
+TAG_FLAG(never_fsync, advanced);
+TAG_FLAG(never_fsync, unsafe);
+
+DEFINE_double(env_inject_io_error, 0.0,
+              "Fraction of the time that certain I/O operations will fail");
+TAG_FLAG(env_inject_io_error, hidden);
+
+DEFINE_int32(env_inject_short_read_bytes, 0,
+             "The number of bytes less than the requested bytes to read");
+TAG_FLAG(env_inject_short_read_bytes, hidden);
+DEFINE_int32(env_inject_short_write_bytes, 0,
+             "The number of bytes less than the requested bytes to write");
+TAG_FLAG(env_inject_short_write_bytes, hidden);
+
+using base::subtle::Atomic64;
+using base::subtle::Barrier_AtomicIncrement;
+using std::accumulate;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+static __thread uint64_t thread_local_id;
+static Atomic64 cur_thread_local_id_;
+
+namespace kudu {
+
+const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE";
+
+namespace {
+
+#if defined(__APPLE__)
+// Simulates Linux's fallocate file preallocation API on OS X.
+int fallocate(int fd, int mode, off_t offset, off_t len) {
+  CHECK_EQ(mode, 0);
+  off_t size = offset + len;
+
+  struct stat stat;
+  int ret = fstat(fd, &stat);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (stat.st_blocks * 512 < size) {
+    // The offset field seems to have no effect; the file is always allocated
+    // with space from 0 to the size. This is probably because OS X does not
+    // support sparse files.
+    fstore_t store = {F_ALLOCATECONTIG, F_PEOFPOSMODE, 0, size};
+    if (fcntl(fd, F_PREALLOCATE, &store) < 0) {
+      LOG(INFO) << "Unable to allocate contiguous disk space, attempting non-contiguous allocation";
+      store.fst_flags = F_ALLOCATEALL;
+      ret = fcntl(fd, F_PREALLOCATE, &store);
+      if (ret < 0) {
+        return ret;
+      }
+    }
+  }
+
+  if (stat.st_size < size) {
+    // fcntl does not change the file size, so set it if necessary.
+    int ret;
+    RETRY_ON_EINTR(ret, ftruncate(fd, size));
+    return ret;
+  }
+  return 0;
+}
+
+// Simulates Linux's preadv API on OS X.
+ssize_t preadv(int fd, const struct iovec* iovec, int count, off_t offset) {
+  ssize_t total_read_bytes = 0;
+  for (int i = 0; i < count; i++) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, pread(fd, iovec[i].iov_base, iovec[i].iov_len, offset));
+    if (r < 0) {
+      return r;
+    }
+    total_read_bytes += r;
+    if (static_cast<size_t>(r) < iovec[i].iov_len) {
+      break;
+    }
+    offset += iovec[i].iov_len;
+  }
+  return total_read_bytes;
+}
+
+// Simulates Linux's pwritev API on OS X.
+ssize_t pwritev(int fd, const struct iovec* iovec, int count, off_t offset) {
+  ssize_t total_written_bytes = 0;
+  for (int i = 0; i < count; i++) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, pwrite(fd, iovec[i].iov_base, iovec[i].iov_len, offset));
+    if (r < 0) {
+      return r;
+    }
+    total_written_bytes += r;
+    if (static_cast<size_t>(r) < iovec[i].iov_len) {
+      break;
+    }
+    offset += iovec[i].iov_len;
+  }
+  return total_written_bytes;
+}
+#endif
+
+
+// Close file descriptor when object goes out of scope.
+class ScopedFdCloser {
+ public:
+  explicit ScopedFdCloser(int fd)
+    : fd_(fd) {
+  }
+
+  ~ScopedFdCloser() {
+    ThreadRestrictions::AssertIOAllowed();
+    int err = ::close(fd_);
+    if (PREDICT_FALSE(err != 0)) {
+      PLOG(WARNING) << "Failed to close fd " << fd_;
+    }
+  }
+
+ private:
+  int fd_;
+};
+
+Status IOError(const std::string& context, int err_number) {
+  switch (err_number) {
+    case ENOENT:
+      return Status::NotFound(context, ErrnoToString(err_number), err_number);
+    case EEXIST:
+      return Status::AlreadyPresent(context, ErrnoToString(err_number), err_number);
+    case EOPNOTSUPP:
+      return Status::NotSupported(context, ErrnoToString(err_number), err_number);
+    case EIO:
+      if (FLAGS_suicide_on_eio) {
+        // TODO: This is very, very coarse-grained. A more comprehensive
+        // approach is described in KUDU-616.
+        LOG(FATAL) << "Fatal I/O error, context: " << context;
+      }
+  }
+  return Status::IOError(context, ErrnoToString(err_number), err_number);
+}
+
+Status DoSync(int fd, const string& filename) {
+  MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                       Status::IOError(Env::kInjectedFailureStatusMsg));
+  ThreadRestrictions::AssertIOAllowed();
+  if (FLAGS_never_fsync) return Status::OK();
+  if (FLAGS_env_use_fsync) {
+    TRACE_COUNTER_SCOPE_LATENCY_US("fsync_us");
+    TRACE_COUNTER_INCREMENT("fsync", 1);
+    if (fsync(fd) < 0) {
+      return IOError(filename, errno);
+    }
+  } else {
+    TRACE_COUNTER_INCREMENT("fdatasync", 1);
+    TRACE_COUNTER_SCOPE_LATENCY_US("fdatasync_us");
+    if (fdatasync(fd) < 0) {
+      return IOError(filename, errno);
+    }
+  }
+  return Status::OK();
+}
+
+Status DoOpen(const string& filename, Env::CreateMode mode, int* fd) {
+  ThreadRestrictions::AssertIOAllowed();
+  int flags = O_RDWR;
+  switch (mode) {
+    case Env::CREATE_IF_NON_EXISTING_TRUNCATE:
+      flags |= O_CREAT | O_TRUNC;
+      break;
+    case Env::CREATE_NON_EXISTING:
+      flags |= O_CREAT | O_EXCL;
+      break;
+    case Env::OPEN_EXISTING:
+      break;
+    default:
+      return Status::NotSupported(Substitute("Unknown create mode $0", mode));
+  }
+  const int f = open(filename.c_str(), flags, 0666);
+  if (f < 0) {
+    return IOError(filename, errno);
+  }
+  *fd = f;
+  return Status::OK();
+}
+
+Status DoReadV(int fd, const string& filename, uint64_t offset, vector<Slice>* results) {
+  ThreadRestrictions::AssertIOAllowed();
+
+  // Convert the results into the iovec vector to request
+  // and calculate the total bytes requested
+  size_t bytes_req = 0;
+  size_t iov_size = results->size();
+  struct iovec iov[iov_size];
+  for (size_t i = 0; i < iov_size; i++) {
+    Slice& result = (*results)[i];
+    bytes_req += result.size();
+    iov[i] = { result.mutable_data(), result.size() };
+  }
+
+  uint64_t cur_offset = offset;
+  size_t completed_iov = 0;
+  size_t rem = bytes_req;
+  while (rem > 0) {
+    // Never request more than IOV_MAX in one request
+    size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX));
+    ssize_t r;
+    RETRY_ON_EINTR(r, preadv(fd, iov + completed_iov, iov_count, cur_offset));
+
+    // Fake a short read for testing
+    if (PREDICT_FALSE(FLAGS_env_inject_short_read_bytes > 0 && rem == bytes_req)) {
+      DCHECK_LT(FLAGS_env_inject_short_read_bytes, r);
+      r -= FLAGS_env_inject_short_read_bytes;
+    }
+
+    if (PREDICT_FALSE(r < 0)) {
+      // An error: return a non-ok status.
+      return IOError(filename, errno);
+    }
+    if (PREDICT_FALSE(r == 0)) {
+      // EOF.
+      return Status::IOError(
+          Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset));
+    }
+    if (PREDICT_TRUE(r == rem)) {
+      // All requested bytes were read. This is almost always the case.
+      return Status::OK();
+    }
+    DCHECK_LE(r, rem);
+    // Adjust iovec vector based on bytes read for the next request
+    ssize_t bytes_rem = r;
+    for (size_t i = completed_iov; i < iov_size; i++) {
+      if (bytes_rem >= iov[i].iov_len) {
+        // The full length of this iovec was read
+        completed_iov++;
+        bytes_rem -= iov[i].iov_len;
+      } else {
+        // Partially read this result.
+        // Adjust the iov_len and iov_base to request only the missing data.
+        iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
+        iov[i].iov_len -= bytes_rem;
+        break; // Don't need to adjust remaining iovec's
+      }
+    }
+    cur_offset += r;
+    rem -= r;
+  }
+  DCHECK_EQ(0, rem);
+  return Status::OK();
+}
+
+Status DoWriteV(int fd, const string& filename, uint64_t offset,
+                const vector<Slice>& data) {
+  MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                       Status::IOError(Env::kInjectedFailureStatusMsg));
+  ThreadRestrictions::AssertIOAllowed();
+
+  // Convert the results into the iovec vector to request
+  // and calculate the total bytes requested.
+  size_t bytes_req = 0;
+  size_t iov_size = data.size();
+  struct iovec iov[iov_size];
+  for (size_t i = 0; i < iov_size; i++) {
+    const Slice& result = data[i];
+    bytes_req += result.size();
+    iov[i] = { const_cast<uint8_t*>(result.data()), result.size() };
+  }
+
+  uint64_t cur_offset = offset;
+  size_t completed_iov = 0;
+  size_t rem = bytes_req;
+  while (rem > 0) {
+    // Never request more than IOV_MAX in one request.
+    size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX));
+    ssize_t w;
+    RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset));
+
+    // Fake a short write for testing.
+    if (PREDICT_FALSE(FLAGS_env_inject_short_write_bytes > 0 && rem == bytes_req)) {
+      DCHECK_LT(FLAGS_env_inject_short_write_bytes, w);
+      w -= FLAGS_env_inject_short_read_bytes;
+    }
+
+    if (PREDICT_FALSE(w < 0)) {
+      // An error: return a non-ok status.
+      return IOError(filename, errno);
+    }
+
+    DCHECK_LE(w, rem);
+
+    if (PREDICT_TRUE(w == rem)) {
+      // All requested bytes were read. This is almost always the case.
+      return Status::OK();
+    }
+    // Adjust iovec vector based on bytes read for the next request.
+    ssize_t bytes_rem = w;
+    for (size_t i = completed_iov; i < iov_size; i++) {
+      if (bytes_rem >= iov[i].iov_len) {
+        // The full length of this iovec was written.
+        completed_iov++;
+        bytes_rem -= iov[i].iov_len;
+      } else {
+        // Partially wrote this result.
+        // Adjust the iov_len and iov_base to write only the missing data.
+        iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
+        iov[i].iov_len -= bytes_rem;
+        break; // Don't need to adjust remaining iovec's.
+      }
+    }
+    cur_offset += w;
+    rem -= w;
+  }
+  DCHECK_EQ(0, rem);
+  return Status::OK();
+}
+
+class PosixSequentialFile: public SequentialFile {
+ private:
+  std::string filename_;
+  FILE* file_;
+
+ public:
+  PosixSequentialFile(std::string fname, FILE* f)
+      : filename_(std::move(fname)), file_(f) {}
+  virtual ~PosixSequentialFile() { fclose(file_); }
+
+  virtual Status Read(Slice* result) OVERRIDE {
+    ThreadRestrictions::AssertIOAllowed();
+    size_t r;
+    STREAM_RETRY_ON_EINTR(r, file_, fread_unlocked(result->mutable_data(), 1,
+                                                   result->size(), file_));
+    if (r < result->size()) {
+      if (feof(file_)) {
+        // We leave status as ok if we hit the end of the file.
+        // We need to adjust the slice size.
+        result->truncate(r);
+      } else {
+        // A partial read with an error: return a non-ok status.
+        return IOError(filename_, errno);
+      }
+    }
+    return Status::OK();
+  }
+
+  virtual Status Skip(uint64_t n) OVERRIDE {
+    TRACE_EVENT1("io", "PosixSequentialFile::Skip", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    if (fseek(file_, n, SEEK_CUR)) {
+      return IOError(filename_, errno);
+    }
+    return Status::OK();
+  }
+
+  virtual const string& filename() const OVERRIDE { return filename_; }
+};
+
+// pread() based random-access
+class PosixRandomAccessFile: public RandomAccessFile {
+ private:
+  std::string filename_;
+  int fd_;
+
+ public:
+  PosixRandomAccessFile(std::string fname, int fd)
+      : filename_(std::move(fname)), fd_(fd) {}
+  virtual ~PosixRandomAccessFile() { close(fd_); }
+
+  virtual Status Read(uint64_t offset, Slice* result) const OVERRIDE {
+    vector<Slice> results = { *result };
+    return ReadV(offset, &results);
+  }
+
+  virtual Status ReadV(uint64_t offset, vector<Slice>* results) const OVERRIDE {
+    return DoReadV(fd_, filename_, offset, results);
+  }
+
+  virtual Status Size(uint64_t *size) const OVERRIDE {
+    TRACE_EVENT1("io", "PosixRandomAccessFile::Size", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    struct stat st;
+    if (fstat(fd_, &st) == -1) {
+      return IOError(filename_, errno);
+    }
+    *size = st.st_size;
+    return Status::OK();
+  }
+
+  virtual const string& filename() const OVERRIDE { return filename_; }
+
+  virtual size_t memory_footprint() const OVERRIDE {
+    return kudu_malloc_usable_size(this) + filename_.capacity();
+  }
+};
+
+// Use non-memory mapped POSIX files to write data to a file.
+//
+// TODO (perf) investigate zeroing a pre-allocated allocated area in
+// order to further improve Sync() performance.
+class PosixWritableFile : public WritableFile {
+ public:
+  PosixWritableFile(std::string fname, int fd, uint64_t file_size,
+                    bool sync_on_close)
+      : filename_(std::move(fname)),
+        fd_(fd),
+        sync_on_close_(sync_on_close),
+        filesize_(file_size),
+        pre_allocated_size_(0),
+        pending_sync_(false) {}
+
+  ~PosixWritableFile() {
+    if (fd_ >= 0) {
+      WARN_NOT_OK(Close(), "Failed to close " + filename_);
+    }
+  }
+
+  virtual Status Append(const Slice& data) OVERRIDE {
+    return AppendV({ data });
+  }
+
+  virtual Status AppendV(const vector<Slice> &data) OVERRIDE {
+    ThreadRestrictions::AssertIOAllowed();
+    RETURN_NOT_OK(DoWriteV(fd_, filename_, filesize_, data));
+    // Calculate the amount of data written
+    size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
+                                      [&](int sum, const Slice& curr) {
+                                        return sum + curr.size();
+                                      });
+    filesize_ += bytes_written;
+    pending_sync_ = true;
+    return Status::OK();
+  }
+
+  virtual Status PreAllocate(uint64_t size) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    uint64_t offset = std::max(filesize_, pre_allocated_size_);
+    if (fallocate(fd_, 0, offset, size) < 0) {
+      if (errno == EOPNOTSUPP) {
+        KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate().";
+      } else if (errno == ENOSYS) {
+        KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate().";
+      } else {
+        return IOError(filename_, errno);
+      }
+    }
+    pre_allocated_size_ = offset + size;
+    return Status::OK();
+  }
+
+  virtual Status Close() OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixWritableFile::Close", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+
+    // If we've allocated more space than we used, truncate to the
+    // actual size of the file and perform Sync().
+    if (filesize_ < pre_allocated_size_) {
+      int ret;
+      RETRY_ON_EINTR(ret, ftruncate(fd_, filesize_));
+      if (ret != 0) {
+        s = IOError(filename_, errno);
+        pending_sync_ = true;
+      }
+    }
+
+    if (sync_on_close_) {
+      Status sync_status = Sync();
+      if (!sync_status.ok()) {
+        LOG(ERROR) << "Unable to Sync " << filename_ << ": " << sync_status.ToString();
+        if (s.ok()) {
+          s = sync_status;
+        }
+      }
+    }
+
+    if (close(fd_) < 0) {
+      if (s.ok()) {
+        s = IOError(filename_, errno);
+      }
+    }
+
+    fd_ = -1;
+    return s;
+  }
+
+  virtual Status Flush(FlushMode mode) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixWritableFile::Flush", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+#if defined(__linux__)
+    int flags = SYNC_FILE_RANGE_WRITE;
+    if (mode == FLUSH_SYNC) {
+      flags |= SYNC_FILE_RANGE_WAIT_BEFORE;
+      flags |= SYNC_FILE_RANGE_WAIT_AFTER;
+    }
+    if (sync_file_range(fd_, 0, 0, flags) < 0) {
+      return IOError(filename_, errno);
+    }
+#else
+    if (mode == FLUSH_SYNC && fsync(fd_) < 0) {
+      return IOError(filename_, errno);
+    }
+#endif
+    return Status::OK();
+  }
+
+  virtual Status Sync() OVERRIDE {
+    TRACE_EVENT1("io", "PosixWritableFile::Sync", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename_)) {
+      if (pending_sync_) {
+        pending_sync_ = false;
+        RETURN_NOT_OK(DoSync(fd_, filename_));
+      }
+    }
+    return Status::OK();
+  }
+
+  virtual uint64_t Size() const OVERRIDE {
+    return filesize_;
+  }
+
+  virtual const string& filename() const OVERRIDE { return filename_; }
+
+ private:
+  const std::string filename_;
+  int fd_;
+  bool sync_on_close_;
+  uint64_t filesize_;
+  uint64_t pre_allocated_size_;
+
+  bool pending_sync_;
+};
+
+class PosixRWFile : public RWFile {
+ public:
+  PosixRWFile(string fname, int fd, bool sync_on_close)
+      : filename_(std::move(fname)),
+        fd_(fd),
+        sync_on_close_(sync_on_close),
+        pending_sync_(false),
+        closed_(false) {}
+
+  ~PosixRWFile() {
+    WARN_NOT_OK(Close(), "Failed to close " + filename_);
+  }
+
+  virtual Status Read(uint64_t offset, Slice* result) const OVERRIDE {
+    vector<Slice> results = { *result };
+    return ReadV(offset, &results);
+  }
+
+  virtual Status ReadV(uint64_t offset, vector<Slice>* results) const OVERRIDE {
+    return DoReadV(fd_, filename_, offset, results);
+  }
+
+  virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
+    return WriteV(offset, { data });
+  }
+
+  virtual Status WriteV(uint64_t offset, const vector<Slice> &data) OVERRIDE {
+    Status s = DoWriteV(fd_, filename_, offset, data);
+    pending_sync_.Store(true);
+    return s;
+  }
+
+  virtual Status PreAllocate(uint64_t offset,
+                             size_t length,
+                             PreAllocateMode mode) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    int falloc_mode = 0;
+    if (mode == DONT_CHANGE_FILE_SIZE) {
+      falloc_mode = FALLOC_FL_KEEP_SIZE;
+    }
+    if (fallocate(fd_, falloc_mode, offset, length) < 0) {
+      if (errno == EOPNOTSUPP) {
+        KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate().";
+      } else if (errno == ENOSYS) {
+        KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate().";
+      } else {
+        return IOError(filename_, errno);
+      }
+    }
+    return Status::OK();
+  }
+
+  virtual Status Truncate(uint64_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT2("io", "PosixRWFile::Truncate", "path", filename_, "length", length);
+    ThreadRestrictions::AssertIOAllowed();
+    int ret;
+    RETRY_ON_EINTR(ret, ftruncate(fd_, length));
+    if (ret != 0) {
+      int err = errno;
+      return Status::IOError(Substitute("Unable to truncate file $0", filename_),
+                             Substitute("ftruncate() failed: $0", ErrnoToString(err)),
+                             err);
+    }
+    return Status::OK();
+  }
+
+  virtual Status PunchHole(uint64_t offset, size_t length) OVERRIDE {
+#if defined(__linux__)
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixRWFile::PunchHole", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    if (fallocate(fd_, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length) < 0) {
+      return IOError(filename_, errno);
+    }
+    return Status::OK();
+#else
+    return Status::NotSupported("Hole punching not supported on this platform");
+#endif
+  }
+
+  virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixRWFile::Flush", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+#if defined(__linux__)
+    int flags = SYNC_FILE_RANGE_WRITE;
+    if (mode == FLUSH_SYNC) {
+      flags |= SYNC_FILE_RANGE_WAIT_AFTER;
+    }
+    if (sync_file_range(fd_, offset, length, flags) < 0) {
+      return IOError(filename_, errno);
+    }
+#else
+    if (mode == FLUSH_SYNC && fsync(fd_) < 0) {
+      return IOError(filename_, errno);
+    }
+#endif
+    return Status::OK();
+  }
+
+  virtual Status Sync() OVERRIDE {
+    if (!pending_sync_.CompareAndSwap(true, false)) {
+      return Status::OK();
+    }
+
+    TRACE_EVENT1("io", "PosixRWFile::Sync", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename())) {
+      RETURN_NOT_OK(DoSync(fd_, filename_));
+    }
+    return Status::OK();
+  }
+
+  virtual Status Close() OVERRIDE {
+    if (closed_) {
+      return Status::OK();
+    }
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+
+    if (sync_on_close_) {
+      s = Sync();
+      if (!s.ok()) {
+        LOG(ERROR) << "Unable to Sync " << filename_ << ": " << s.ToString();
+      }
+    }
+
+    if (close(fd_) < 0) {
+      if (s.ok()) {
+        s = IOError(filename_, errno);
+      }
+    }
+
+    closed_ = true;
+    return s;
+  }
+
+  virtual Status Size(uint64_t* size) const OVERRIDE {
+    TRACE_EVENT1("io", "PosixRWFile::Size", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+    struct stat st;
+    if (fstat(fd_, &st) == -1) {
+      return IOError(filename_, errno);
+    }
+    *size = st.st_size;
+    return Status::OK();
+  }
+
+  virtual Status GetExtentMap(ExtentMap* out) const OVERRIDE {
+#if !defined(__linux__)
+    return Status::NotSupported("GetExtentMap not supported on this platform");
+#else
+    TRACE_EVENT1("io", "PosixRWFile::GetExtentMap", "path", filename_);
+    ThreadRestrictions::AssertIOAllowed();
+
+    // This allocation size is arbitrary.
+    static const int kBufSize = 4096;
+    uint8_t buf[kBufSize] = { 0 };
+
+    struct fiemap* fm = reinterpret_cast<struct fiemap*>(buf);
+    struct fiemap_extent* fme = &fm->fm_extents[0];
+    int avail_extents_in_buffer = (kBufSize - sizeof(*fm)) / sizeof(*fme);
+    bool saw_last_extent = false;
+    ExtentMap extents;
+    do {
+      // Fetch another block of extents.
+      fm->fm_length = FIEMAP_MAX_OFFSET;
+      fm->fm_extent_count = avail_extents_in_buffer;
+      if (ioctl(fd_, FS_IOC_FIEMAP, fm) == -1) {
+        return IOError(filename_, errno);
+      }
+
+      // No extents returned, this file must have no extents.
+      if (fm->fm_mapped_extents == 0) {
+        break;
+      }
+
+      // Parse the extent block.
+      uint64_t last_extent_end_offset;
+      for (int i = 0; i < fm->fm_mapped_extents; i++) {
+        if (fme[i].fe_flags & FIEMAP_EXTENT_LAST) {
+          // This should really be the last extent.
+          CHECK_EQ(fm->fm_mapped_extents - 1, i);
+
+          saw_last_extent = true;
+        }
+        InsertOrDie(&extents, fme[i].fe_logical, fme[i].fe_length);
+        VLOG(3) << Substitute("File $0 extent $1: o $2, l $3 $4",
+                              filename_, i,
+                              fme[i].fe_logical, fme[i].fe_length,
+                              saw_last_extent ? "(final)" : "");
+        last_extent_end_offset = fme[i].fe_logical + fme[i].fe_length;
+        if (saw_last_extent) {
+          break;
+        }
+      }
+
+      fm->fm_start = last_extent_end_offset;
+    } while (!saw_last_extent);
+
+    out->swap(extents);
+    return Status::OK();
+#endif
+  }
+
+  virtual const string& filename() const OVERRIDE {
+    return filename_;
+  }
+
+ private:
+  const std::string filename_;
+  const int fd_;
+  const bool sync_on_close_;
+
+  AtomicBool pending_sync_;
+  bool closed_;
+};
+
+int LockOrUnlock(int fd, bool lock) {
+  ThreadRestrictions::AssertIOAllowed();
+  errno = 0;
+  struct flock f;
+  memset(&f, 0, sizeof(f));
+  f.l_type = (lock ? F_WRLCK : F_UNLCK);
+  f.l_whence = SEEK_SET;
+  f.l_start = 0;
+  f.l_len = 0;        // Lock/unlock entire file
+  return fcntl(fd, F_SETLK, &f);
+}
+
+class PosixFileLock : public FileLock {
+ public:
+  int fd_;
+};
+
+class PosixEnv : public Env {
+ public:
+  PosixEnv();
+  virtual ~PosixEnv() {
+    fprintf(stderr, "Destroying Env::Default()\n");
+    exit(1);
+  }
+
+  virtual Status NewSequentialFile(const std::string& fname,
+                                   unique_ptr<SequentialFile>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewSequentialFile", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    FILE* f = fopen(fname.c_str(), "r");
+    if (f == nullptr) {
+      return IOError(fname, errno);
+    } else {
+      result->reset(new PosixSequentialFile(fname, f));
+      return Status::OK();
+    }
+  }
+
+  virtual Status NewRandomAccessFile(const std::string& fname,
+                                     unique_ptr<RandomAccessFile>* result) OVERRIDE {
+    return NewRandomAccessFile(RandomAccessFileOptions(), fname, result);
+  }
+
+  virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts,
+                                     const std::string& fname,
+                                     unique_ptr<RandomAccessFile>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewRandomAccessFile", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    int fd = open(fname.c_str(), O_RDONLY);
+    if (fd < 0) {
+      return IOError(fname, errno);
+    }
+
+    result->reset(new PosixRandomAccessFile(fname, fd));
+    return Status::OK();
+  }
+
+  virtual Status NewWritableFile(const std::string& fname,
+                                 unique_ptr<WritableFile>* result) OVERRIDE {
+    return NewWritableFile(WritableFileOptions(), fname, result);
+  }
+
+  virtual Status NewWritableFile(const WritableFileOptions& opts,
+                                 const std::string& fname,
+                                 unique_ptr<WritableFile>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewWritableFile", "path", fname);
+    int fd;
+    RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
+    return InstantiateNewWritableFile(fname, fd, opts, result);
+  }
+
+  virtual Status NewTempWritableFile(const WritableFileOptions& opts,
+                                     const std::string& name_template,
+                                     std::string* created_filename,
+                                     unique_ptr<WritableFile>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewTempWritableFile", "template", name_template);
+    int fd;
+    string tmp_filename;
+    RETURN_NOT_OK(MkTmpFile(name_template, &fd, &tmp_filename));
+    RETURN_NOT_OK(InstantiateNewWritableFile(tmp_filename, fd, opts, result));
+    created_filename->swap(tmp_filename);
+    return Status::OK();
+  }
+
+  virtual Status NewRWFile(const string& fname,
+                           unique_ptr<RWFile>* result) OVERRIDE {
+    return NewRWFile(RWFileOptions(), fname, result);
+  }
+
+  virtual Status NewRWFile(const RWFileOptions& opts,
+                           const string& fname,
+                           unique_ptr<RWFile>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewRWFile", "path", fname);
+    int fd;
+    RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
+    result->reset(new PosixRWFile(fname, fd, opts.sync_on_close));
+    return Status::OK();
+  }
+
+  virtual Status NewTempRWFile(const RWFileOptions& opts, const std::string& name_template,
+                               std::string* created_filename, unique_ptr<RWFile>* res) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::NewTempRWFile", "template", name_template);
+    int fd;
+    RETURN_NOT_OK(MkTmpFile(name_template, &fd, created_filename));
+    res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close));
+    return Status::OK();
+  }
+
+  virtual bool FileExists(const std::string& fname) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::FileExists", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    return access(fname.c_str(), F_OK) == 0;
+  }
+
+  virtual Status GetChildren(const std::string& dir,
+                             std::vector<std::string>* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetChildren", "path", dir);
+    ThreadRestrictions::AssertIOAllowed();
+    result->clear();
+    DIR* d = opendir(dir.c_str());
+    if (d == nullptr) {
+      return IOError(dir, errno);
+    }
+    struct dirent* entry;
+    // TODO: lint: Consider using readdir_r(...) instead of readdir(...) for improved thread safety.
+    while ((entry = readdir(d)) != nullptr) {
+      result->push_back(entry->d_name);
+    }
+    closedir(d);
+    return Status::OK();
+  }
+
+  virtual Status DeleteFile(const std::string& fname) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::DeleteFile", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    Status result;
+    if (unlink(fname.c_str()) != 0) {
+      result = IOError(fname, errno);
+    }
+    return result;
+  };
+
+  virtual Status CreateDir(const std::string& name) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::CreateDir", "path", name);
+    ThreadRestrictions::AssertIOAllowed();
+    Status result;
+    if (mkdir(name.c_str(), 0777) != 0) {
+      result = IOError(name, errno);
+    }
+    return result;
+  };
+
+  virtual Status DeleteDir(const std::string& name) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::DeleteDir", "path", name);
+    ThreadRestrictions::AssertIOAllowed();
+    Status result;
+    if (rmdir(name.c_str()) != 0) {
+      result = IOError(name, errno);
+    }
+    return result;
+  };
+
+  Status GetCurrentWorkingDir(string* cwd) const override {
+    TRACE_EVENT0("io", "PosixEnv::GetCurrentWorkingDir");
+    ThreadRestrictions::AssertIOAllowed();
+    unique_ptr<char, FreeDeleter> wd(getcwd(NULL, 0));
+    if (!wd) {
+      return IOError("getcwd()", errno);
+    }
+    cwd->assign(wd.get());
+    return Status::OK();
+  }
+
+  Status ChangeDir(const string& dest) override {
+    TRACE_EVENT1("io", "PosixEnv::ChangeDir", "dest", dest);
+    ThreadRestrictions::AssertIOAllowed();
+    Status result;
+    if (chdir(dest.c_str()) != 0) {
+      result = IOError(dest, errno);
+    }
+    return result;
+  }
+
+  virtual Status SyncDir(const std::string& dirname) OVERRIDE {
+    TRACE_EVENT1("io", "SyncDir", "path", dirname);
+    ThreadRestrictions::AssertIOAllowed();
+    if (FLAGS_never_fsync) return Status::OK();
+    int dir_fd;
+    if ((dir_fd = open(dirname.c_str(), O_DIRECTORY|O_RDONLY)) == -1) {
+      return IOError(dirname, errno);
+    }
+    ScopedFdCloser fd_closer(dir_fd);
+    if (fsync(dir_fd) != 0) {
+      return IOError(dirname, errno);
+    }
+    return Status::OK();
+  }
+
+  virtual Status DeleteRecursively(const std::string &name) OVERRIDE {
+    return Walk(name, POST_ORDER, Bind(&PosixEnv::DeleteRecursivelyCb,
+                                       Unretained(this)));
+  }
+
+  virtual Status GetFileSize(const std::string& fname, uint64_t* size) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetFileSize", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+    struct stat sbuf;
+    if (stat(fname.c_str(), &sbuf) != 0) {
+      s = IOError(fname, errno);
+    } else {
+      *size = sbuf.st_size;
+    }
+    return s;
+  }
+
+  virtual Status GetFileSizeOnDisk(const std::string& fname, uint64_t* size) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDisk", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+    struct stat sbuf;
+    if (stat(fname.c_str(), &sbuf) != 0) {
+      s = IOError(fname, errno);
+    } else {
+      // From stat(2):
+      //
+      //   The st_blocks field indicates the number of blocks allocated to
+      //   the file, 512-byte units. (This may be smaller than st_size/512
+      //   when the file has holes.)
+      *size = sbuf.st_blocks * 512;
+    }
+    return s;
+  }
+
+  virtual Status GetFileSizeOnDiskRecursively(const string& root,
+                                              uint64_t* bytes_used) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDiskRecursively", "path", root);
+    uint64_t total = 0;
+    RETURN_NOT_OK(Walk(root, Env::PRE_ORDER,
+                       Bind(&PosixEnv::GetFileSizeOnDiskRecursivelyCb,
+                            Unretained(this), &total)));
+    *bytes_used = total;
+    return Status::OK();
+  }
+
+  virtual Status GetBlockSize(const string& fname, uint64_t* block_size) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetBlockSize", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+    struct stat sbuf;
+    if (stat(fname.c_str(), &sbuf) != 0) {
+      s = IOError(fname, errno);
+    } else {
+      *block_size = sbuf.st_blksize;
+    }
+    return s;
+  }
+
+  virtual Status GetFileModifiedTime(const string& fname, int64_t* timestamp) override {
+    TRACE_EVENT1("io", "PosixEnv::GetFileModifiedTime", "fname", fname);
+    ThreadRestrictions::AssertIOAllowed();
+
+    struct stat s;
+    if (stat(fname.c_str(), &s) != 0) {
+      return IOError(fname, errno);
+    }
+#ifdef __APPLE__
+    *timestamp = s.st_mtimespec.tv_sec * 1e6 + s.st_mtimespec.tv_nsec / 1e3;
+#else
+    *timestamp = s.st_mtim.tv_sec * 1e6 + s.st_mtim.tv_nsec / 1e3;
+#endif
+    return Status::OK();
+  }
+
+  // Local convenience function for safely running statvfs().
+  static Status StatVfs(const string& path, struct statvfs* buf) {
+    ThreadRestrictions::AssertIOAllowed();
+    int ret;
+    RETRY_ON_EINTR(ret, statvfs(path.c_str(), buf));
+    if (ret == -1) {
+      return IOError(Substitute("statvfs: $0", path), errno);
+    }
+    return Status::OK();
+  }
+
+  virtual Status GetSpaceInfo(const string& path, SpaceInfo* space_info) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::GetSpaceInfo", "path", path);
+    struct statvfs buf;
+    RETURN_NOT_OK(StatVfs(path, &buf));
+    space_info->capacity_bytes = buf.f_frsize * buf.f_blocks;
+    space_info->free_bytes = buf.f_frsize * buf.f_bavail;
+    return Status::OK();
+  }
+
+  virtual Status RenameFile(const std::string& src, const std::string& target) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    TRACE_EVENT2("io", "PosixEnv::RenameFile", "src", src, "dst", target);
+    ThreadRestrictions::AssertIOAllowed();
+    Status result;
+    if (rename(src.c_str(), target.c_str()) != 0) {
+      result = IOError(src, errno);
+    }
+    return result;
+  }
+
+  virtual Status LockFile(const std::string& fname, FileLock** lock) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::LockFile", "path", fname);
+    ThreadRestrictions::AssertIOAllowed();
+    *lock = nullptr;
+    Status result;
+    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0666);
+    if (fd < 0) {
+      result = IOError(fname, errno);
+    } else if (LockOrUnlock(fd, true) == -1) {
+      result = IOError("lock " + fname, errno);
+      close(fd);
+    } else {
+      auto my_lock = new PosixFileLock;
+      my_lock->fd_ = fd;
+      *lock = my_lock;
+    }
+    return result;
+  }
+
+  virtual Status UnlockFile(FileLock* lock) OVERRIDE {
+    TRACE_EVENT0("io", "PosixEnv::UnlockFile");
+    ThreadRestrictions::AssertIOAllowed();
+    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
+    Status result;
+    if (LockOrUnlock(my_lock->fd_, false) == -1) {
+      result = IOError("unlock", errno);
+    }
+    close(my_lock->fd_);
+    delete my_lock;
+    return result;
+  }
+
+  virtual Status GetTestDirectory(std::string* result) OVERRIDE {
+    string dir;
+    const char* env = getenv("TEST_TMPDIR");
+    if (env && env[0] != '\0') {
+      dir = env;
+    } else {
+      char buf[100];
+      snprintf(buf, sizeof(buf), "/tmp/kudutest-%d", static_cast<int>(geteuid()));
+      dir = buf;
+    }
+    // Directory may already exist
+    ignore_result(CreateDir(dir));
+    // /tmp may be a symlink, so canonicalize the path.
+    return Canonicalize(dir, result);
+  }
+
+  virtual uint64_t gettid() OVERRIDE {
+    // Platform-independent thread ID.  We can't use pthread_self here,
+    // because that function returns a totally opaque ID, which can't be
+    // compared via normal means.
+    if (thread_local_id == 0) {
+      thread_local_id = Barrier_AtomicIncrement(&cur_thread_local_id_, 1);
+    }
+    return thread_local_id;
+  }
+
+  virtual uint64_t NowMicros() OVERRIDE {
+    struct timeval tv;
+    gettimeofday(&tv, nullptr);
+    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
+  }
+
+  virtual void SleepForMicroseconds(int micros) OVERRIDE {
+    ThreadRestrictions::AssertWaitAllowed();
+    SleepFor(MonoDelta::FromMicroseconds(micros));
+  }
+
+  virtual Status GetExecutablePath(string* path) OVERRIDE {
+    uint32_t size = 64;
+    uint32_t len = 0;
+    while (true) {
+      unique_ptr<char[]> buf(new char[size]);
+#if defined(__linux__)
+      int rc = readlink("/proc/self/exe", buf.get(), size);
+      if (rc == -1) {
+        return IOError("Unable to determine own executable path", errno);
+      } else if (rc >= size) {
+        // The buffer wasn't large enough
+        size *= 2;
+        continue;
+      }
+      len = rc;
+#elif defined(__APPLE__)
+      if (_NSGetExecutablePath(buf.get(), &size) != 0) {
+        // The buffer wasn't large enough; 'size' has been updated.
+        continue;
+      }
+      len = strlen(buf.get());
+#else
+#error Unsupported platform
+#endif
+
+      path->assign(buf.get(), len);
+      break;
+    }
+    return Status::OK();
+  }
+
+  virtual Status IsDirectory(const string& path, bool* is_dir) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::IsDirectory", "path", path);
+    ThreadRestrictions::AssertIOAllowed();
+    Status s;
+    struct stat sbuf;
+    if (stat(path.c_str(), &sbuf) != 0) {
+      s = IOError(path, errno);
+    } else {
+      *is_dir = S_ISDIR(sbuf.st_mode);
+    }
+    return s;
+  }
+
+  virtual Status Walk(const string& root, DirectoryOrder order, const WalkCallback& cb) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::Walk", "path", root);
+    ThreadRestrictions::AssertIOAllowed();
+    // Some sanity checks
+    CHECK_NE(root, "/");
+    CHECK_NE(root, "./");
+    CHECK_NE(root, ".");
+    CHECK_NE(root, "");
+
+    // FTS requires a non-const copy of the name. strdup it and free() when
+    // we leave scope.
+    unique_ptr<char, FreeDeleter> name_dup(strdup(root.c_str()));
+    char *(paths[]) = { name_dup.get(), nullptr };
+
+    // FTS_NOCHDIR is important here to make this thread-safe.
+    unique_ptr<FTS, FtsCloser> tree(
+        fts_open(paths, FTS_PHYSICAL | FTS_XDEV | FTS_NOCHDIR, nullptr));
+    if (!tree.get()) {
+      return IOError(root, errno);
+    }
+
+    FTSENT *ent = nullptr;
+    bool had_errors = false;
+    while ((ent = fts_read(tree.get())) != nullptr) {
+      bool doCb = false;
+      FileType type = DIRECTORY_TYPE;
+      switch (ent->fts_info) {
+        case FTS_D:         // Directory in pre-order
+          if (order == PRE_ORDER) {
+            doCb = true;
+          }
+          break;
+        case FTS_DP:        // Directory in post-order
+          if (order == POST_ORDER) {
+            doCb = true;
+          }
+          break;
+        case FTS_F:         // A regular file
+        case FTS_SL:        // A symbolic link
+        case FTS_SLNONE:    // A broken symbolic link
+        case FTS_DEFAULT:   // Unknown type of file
+          doCb = true;
+          type = FILE_TYPE;
+          break;
+
+        case FTS_ERR:
+          LOG(WARNING) << "Unable to access file " << ent->fts_path
+                       << " during walk: " << strerror(ent->fts_errno);
+          had_errors = true;
+          break;
+
+        default:
+          LOG(WARNING) << "Unable to access file " << ent->fts_path
+                       << " during walk (code " << ent->fts_info << ")";
+          break;
+      }
+      if (doCb) {
+        if (!cb.Run(type, DirName(ent->fts_path), ent->fts_name).ok()) {
+          had_errors = true;
+        }
+      }
+    }
+
+    if (had_errors) {
+      return Status::IOError(root, "One or more errors occurred");
+    }
+    return Status::OK();
+  }
+
+  Status Glob(const string& path_pattern, vector<string>* paths) override {
+    TRACE_EVENT1("io", "PosixEnv::Glob", "path_pattern", path_pattern);
+    ThreadRestrictions::AssertIOAllowed();
+
+    glob_t result;
+    auto cleanup = MakeScopedCleanup([&] { globfree(&result); });
+
+    int ret = glob(path_pattern.c_str(), GLOB_TILDE | GLOB_ERR , NULL, &result);
+    switch (ret) {
+      case 0: break;
+      case GLOB_NOMATCH: return Status::OK();
+      case GLOB_NOSPACE: return Status::RuntimeError("glob out of memory");
+      default: return Status::IOError("glob failure", std::to_string(ret));
+    }
+
+    for (size_t i = 0; i < result.gl_pathc; ++i) {
+      paths->emplace_back(result.gl_pathv[i]);
+    }
+    return Status::OK();
+  }
+
+  virtual Status Canonicalize(const string& path, string* result) OVERRIDE {
+    TRACE_EVENT1("io", "PosixEnv::Canonicalize", "path", path);
+    ThreadRestrictions::AssertIOAllowed();
+    unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
+    if (!r) {
+      return IOError(Substitute("Unable to canonicalize $0", path), errno);
+    }
+    *result = string(r.get());
+    return Status::OK();
+  }
+
+  virtual Status GetTotalRAMBytes(int64_t* ram) OVERRIDE {
+#if defined(__APPLE__)
+    int mib[2];
+    size_t length = sizeof(*ram);
+
+    // Get the Physical memory size
+    mib[0] = CTL_HW;
+    mib[1] = HW_MEMSIZE;
+    CHECK_ERR(sysctl(mib, 2, ram, &length, nullptr, 0)) << "sysctl CTL_HW HW_MEMSIZE failed";
+#else
+    struct sysinfo info;
+    if (sysinfo(&info) < 0) {
+      return IOError("sysinfo() failed", errno);
+    }
+    *ram = info.totalram;
+#endif
+    return Status::OK();
+  }
+
+  virtual int64_t GetOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+    return l.rlim_cur;
+  }
+
+  virtual void IncreaseOpenFileLimit() OVERRIDE {
+    // There's no reason for this to ever fail; any process should have
+    // sufficient privilege to increase its soft limit up to the hard limit.
+    //
+    // This change is logged because it is process-wide.
+    struct rlimit l;
+    PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0);
+#if defined(__APPLE__)
+    // OS X 10.11 can return RLIM_INFINITY from getrlimit, but allows rlim_cur and
+    // rlim_max to be raised only as high as the value of the maxfilesperproc
+    // kernel variable. Emperically, this value is 10240 across all tested macOS
+    // versions. Testing on OS X 10.10 and macOS 10.12 revealed that getrlimit
+    // returns the true limits (not RLIM_INFINITY), rlim_max can *not* be raised
+    // (when running as non-root), and rlim_cur can only be raised as high as
+    // rlim_max (this is consistent with Linux).
+    // TLDR; OS X 10.11 is wack.
+    if (l.rlim_max == RLIM_INFINITY) {
+      uint32_t limit;
+      size_t len = sizeof(limit);
+      PCHECK(sysctlbyname("kern.maxfilesperproc", &limit, &len, nullptr, 0) == 0);
+      // Make sure no uninitialized bits are present in the result.
+      DCHECK_EQ(sizeof(limit), len);
+      l.rlim_max = limit;
+    }
+#endif
+    if (l.rlim_cur < l.rlim_max) {
+      LOG(INFO) << Substitute("Raising process file limit from $0 to $1",
+                              l.rlim_cur, l.rlim_max);
+      l.rlim_cur = l.rlim_max;
+      PCHECK(setrlimit(RLIMIT_NOFILE, &l) == 0);
+    } else {
+      LOG(INFO) << Substitute("Not raising process file limit of $0; it is "
+          "already as high as it can go", l.rlim_cur);
+    }
+  }
+
+  virtual Status IsOnExtFilesystem(const string& path, bool* result) OVERRIDE {
+    TRACE_EVENT0("io", "PosixEnv::IsOnExtFilesystem");
+    ThreadRestrictions::AssertIOAllowed();
+
+#ifdef __APPLE__
+    *result = false;
+#else
+    struct statfs buf;
+    int ret;
+    RETRY_ON_EINTR(ret, statfs(path.c_str(), &buf));
+    if (ret == -1) {
+      return IOError(Substitute("statfs: $0", path), errno);
+    }
+    *result = (buf.f_type == EXT4_SUPER_MAGIC);
+#endif
+    return Status::OK();
+  }
+
+  virtual string GetKernelRelease() OVERRIDE {
+    // There's no reason for this to ever fail.
+    struct utsname u;
+    PCHECK(uname(&u) == 0);
+    return string(u.release);
+  }
+
+  Status EnsureFileModeAdheresToUmask(const string& path) override {
+    struct stat s;
+    if (stat(path.c_str(), &s) != 0) {
+      return IOError("stat", errno);
+    }
+    CHECK_NE(g_parsed_umask, -1);
+    if (s.st_mode & g_parsed_umask) {
+      uint32_t old_perms = s.st_mode & ACCESSPERMS;
+      uint32_t new_perms = old_perms & ~g_parsed_umask;
+      LOG(WARNING) << "Path " << path << " has permissions "
+                   << StringPrintf("%03o", old_perms)
+                   << " which are less restrictive than current umask value "
+                   << StringPrintf("%03o", g_parsed_umask)
+                   << ": resetting permissions to "
+                   << StringPrintf("%03o", new_perms);
+      if (chmod(path.c_str(), new_perms) != 0) {
+        return IOError("chmod", errno);
+      }
+    }
+    return Status::OK();
+  }
+
+ private:
+  // unique_ptr Deleter implementation for fts_close
+  struct FtsCloser {
+    void operator()(FTS *fts) const {
+      if (fts) { fts_close(fts); }
+    }
+  };
+
+  Status MkTmpFile(const string& name_template, int* fd, string* created_filename) {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+    ThreadRestrictions::AssertIOAllowed();
+    unique_ptr<char[]> fname(new char[name_template.size() + 1]);
+    ::snprintf(fname.get(), name_template.size() + 1, "%s", name_template.c_str());
+    int created_fd = mkstemp(fname.get());
+    if (created_fd < 0) {
+      return IOError(Substitute("Call to mkstemp() failed on name template $0", name_template),
+                     errno);
+    }
+    // mkstemp defaults to making files with permissions 0600. But, if the
+    // user configured a more permissive umask, then we ensure that the
+    // resulting file gets the desired (wider) permissions.
+    uint32_t new_perms = 0666 & ~g_parsed_umask;
+    if (new_perms != 0600) {
+      CHECK_ERR(fchmod(created_fd, new_perms));
+    }
+    *fd = created_fd;
+    *created_filename = fname.get();
+    return Status::OK();
+  }
+
+  Status InstantiateNewWritableFile(const std::string& fname,
+                                    int fd,
+                                    const WritableFileOptions& opts,
+                                    unique_ptr<WritableFile>* result) {
+    uint64_t file_size = 0;
+    if (opts.mode == OPEN_EXISTING) {
+      RETURN_NOT_OK(GetFileSize(fname, &file_size));
+    }
+    result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close));
+    return Status::OK();
+  }
+
+  Status DeleteRecursivelyCb(FileType type, const string& dirname, const string& basename) {
+    string full_path = JoinPathSegments(dirname, basename);
+    Status s;
+    switch (type) {
+      case FILE_TYPE:
+        s = DeleteFile(full_path);
+        WARN_NOT_OK(s, "Could not delete file");
+        return s;
+      case DIRECTORY_TYPE:
+        s = DeleteDir(full_path);
+        WARN_NOT_OK(s, "Could not delete directory");
+        return s;
+      default:
+        LOG(FATAL) << "Unknown file type: " << type;
+        return Status::OK();
+    }
+  }
+
+  Status GetFileSizeOnDiskRecursivelyCb(uint64_t* bytes_used,
+                                        Env::FileType type,
+                                        const string& dirname,
+                                        const string& basename) {
+    uint64_t file_bytes_used = 0;
+    switch (type) {
+      case Env::FILE_TYPE:
+        RETURN_NOT_OK(GetFileSizeOnDisk(
+            JoinPathSegments(dirname, basename), &file_bytes_used));
+        *bytes_used += file_bytes_used;
+        break;
+      case Env::DIRECTORY_TYPE:
+        // Ignore directory space consumption as it varies from filesystem to
+        // filesystem.
+        break;
+      default:
+        LOG(FATAL) << "Unknown file type: " << type;
+    }
+    return Status::OK();
+  }
+};
+
+PosixEnv::PosixEnv() {}
+
+}  // namespace
+
+static pthread_once_t once = PTHREAD_ONCE_INIT;
+static Env* default_env;
+static void InitDefaultEnv() { default_env = new PosixEnv; }
+
+Env* Env::Default() {
+  pthread_once(&once, InitDefaultEnv);
+  return default_env;
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env_util-test.cc b/be/src/kudu/util/env_util-test.cc
new file mode 100644
index 0000000..78bb006
--- /dev/null
+++ b/be/src/kudu/util/env_util-test.cc
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/statvfs.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <memory>
+#include <unordered_set>
+
+#include <gflags/gflags.h>
+#include <glog/stl_logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using strings::Substitute;
+
+namespace kudu {
+namespace env_util {
+
+class EnvUtilTest: public KuduTest {
+};
+
+// Assert that Status 's' indicates there is not enough space left on the
+// device for the request.
+static void AssertNoSpace(const Status& s) {
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_EQ(ENOSPC, s.posix_code());
+  ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+}
+
+TEST_F(EnvUtilTest, TestDiskSpaceCheck) {
+  const int64_t kZeroRequestedBytes = 0;
+  const int64_t kRequestOnePercentReservation = -1;
+  int64_t reserved_bytes = 0;
+  ASSERT_OK(VerifySufficientDiskSpace(env_, test_dir_, kZeroRequestedBytes, reserved_bytes));
+
+  // Check 1% reservation logic. We loop this in case there are other FS
+  // operations happening concurrent with this test.
+  ASSERT_EVENTUALLY([&] {
+    SpaceInfo space_info;
+    ASSERT_OK(env_->GetSpaceInfo(test_dir_, &space_info));
+    // Try for 1 less byte than 1% free. This request should be rejected.
+    int64_t target_free_bytes = (space_info.capacity_bytes / 100) - 1;
+    int64_t bytes_to_request = std::max<int64_t>(0, space_info.free_bytes - target_free_bytes);
+    NO_FATALS(AssertNoSpace(VerifySufficientDiskSpace(env_, test_dir_, bytes_to_request,
+                                                      kRequestOnePercentReservation)));
+  });
+
+  // Make it seem as if the disk is full and specify that we should have
+  // reserved 200 bytes. Even asking for 0 bytes should return an error
+  // indicating we are out of space.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  reserved_bytes = 200;
+  NO_FATALS(AssertNoSpace(VerifySufficientDiskSpace(env_, test_dir_, kZeroRequestedBytes,
+                                                    reserved_bytes)));
+}
+
+// Ensure that we can recursively create directories using both absolute and
+// relative paths.
+TEST_F(EnvUtilTest, TestCreateDirsRecursively) {
+  // Absolute path.
+  string path = JoinPathSegments(test_dir_, "a/b/c");
+  ASSERT_OK(CreateDirsRecursively(env_, path));
+  bool is_dir;
+  ASSERT_OK(env_->IsDirectory(path, &is_dir));
+  ASSERT_TRUE(is_dir);
+
+  // Repeating the previous command should also succeed (it should be a no-op).
+  ASSERT_OK(CreateDirsRecursively(env_, path));
+  ASSERT_OK(env_->IsDirectory(path, &is_dir));
+  ASSERT_TRUE(is_dir);
+
+  // Relative path.
+  ASSERT_OK(env_->ChangeDir(test_dir_)); // Change to test dir to keep CWD clean.
+  string rel_base = Substitute("$0-$1", CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
+  ASSERT_FALSE(env_->FileExists(rel_base));
+  path = JoinPathSegments(rel_base, "x/y/z");
+  ASSERT_OK(CreateDirsRecursively(env_, path));
+  ASSERT_OK(env_->IsDirectory(path, &is_dir));
+  ASSERT_TRUE(is_dir);
+
+  // Directory creation should fail if a file is a part of the path.
+  path = JoinPathSegments(test_dir_, "x/y/z");
+  string file_path = JoinPathSegments(test_dir_, "x"); // Conflicts with 'path'.
+  ASSERT_FALSE(env_->FileExists(path));
+  ASSERT_FALSE(env_->FileExists(file_path));
+  // Create an empty file in the path.
+  unique_ptr<WritableFile> out;
+  ASSERT_OK(env_->NewWritableFile(file_path, &out));
+  ASSERT_OK(out->Close());
+  ASSERT_TRUE(env_->FileExists(file_path));
+  // Fail.
+  Status s = CreateDirsRecursively(env_, path);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "File exists");
+
+  // We should be able to create a directory tree even when a symlink exists as
+  // part of the path.
+  path = JoinPathSegments(test_dir_, "link/a/b");
+  string link_path = JoinPathSegments(test_dir_, "link");
+  string real_dir = JoinPathSegments(test_dir_, "real_dir");
+  ASSERT_OK(env_->CreateDir(real_dir));
+  PCHECK(symlink(real_dir.c_str(), link_path.c_str()) == 0);
+  ASSERT_OK(CreateDirsRecursively(env_, path));
+  ASSERT_OK(env_->IsDirectory(path, &is_dir));
+  ASSERT_TRUE(is_dir);
+}
+
+// Ensure that DeleteExcessFilesByPattern() works.
+// We ensure that the number of files remaining after running it is the number
+// expected, and we manually set the modification times on the relevant files
+// to allow us to test that files are deleted oldest-first.
+TEST_F(EnvUtilTest, TestDeleteExcessFilesByPattern) {
+  string dir = JoinPathSegments(test_dir_, "excess");
+  ASSERT_OK(env_->CreateDir(dir));
+  vector<string> filenames = {"a", "b", "c", "d"};
+  int now_sec = GetCurrentTimeMicros() / 1000;
+  for (int i = 0; i < filenames.size(); i++) {
+    const string& filename = filenames[i];
+    string path = JoinPathSegments(dir, filename);
+    unique_ptr<WritableFile> file;
+    ASSERT_OK(env_->NewWritableFile(path, &file));
+    ASSERT_OK(file->Close());
+
+    // Set the last-modified time of the file.
+    struct timeval target_time { .tv_sec = now_sec + (i * 2), .tv_usec = 0 };
+    struct timeval times[2] = { target_time, target_time };
+    ASSERT_EQ(0, utimes(path.c_str(), times)) << errno;
+  }
+  vector<string> children;
+  ASSERT_OK(env_->GetChildren(dir, &children));
+  ASSERT_EQ(6, children.size()); // 4 files plus "." and "..".
+  ASSERT_OK(DeleteExcessFilesByPattern(env_, dir + "/*", 2));
+  ASSERT_OK(env_->GetChildren(dir, &children));
+  ASSERT_EQ(4, children.size()); // 2 files plus "." and "..".
+  unordered_set<string> children_set(children.begin(), children.end());
+  unordered_set<string> expected_set({".", "..", "c", "d"});
+  ASSERT_EQ(expected_set, children_set) << children;
+}
+
+} // namespace env_util
+} // namespace kudu


Mime
View raw message