ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [22/24] ignite git commit: IGNITE-3276: IGFS: Reworked output stream.
Date Fri, 10 Jun 2016 06:10:44 GMT
IGNITE-3276: IGFS: Reworked output stream.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec5706ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec5706ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec5706ff

Branch: refs/heads/ignite-3038
Commit: ec5706fff899b71c8f3a15a1476744004f876c6e
Parents: ec887a3
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Jun 8 14:41:49 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Jun 8 14:41:49 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  34 ++
 .../processors/igfs/IgfsDataManager.java        |  28 +-
 .../internal/processors/igfs/IgfsImpl.java      |  45 +-
 .../processors/igfs/IgfsMetaManager.java        | 442 ++++++++-------
 .../igfs/IgfsOutputStreamAdapter.java           | 265 ---------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 558 +++++++++++--------
 .../igfs/meta/IgfsMetaFileUnlockProcessor.java  |  69 ++-
 .../processors/igfs/IgfsAbstractSelfTest.java   |  12 +-
 .../igfs/IgfsDataManagerSelfTest.java           |  12 +-
 .../igfs/IgfsProcessorValidationSelfTest.java   |  38 +-
 .../processors/igfs/IgfsTaskSelfTest.java       |   2 +-
 11 files changed, 733 insertions(+), 772 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 625ba95..074636a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.configuration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
 import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -91,6 +92,9 @@ public class FileSystemConfiguration {
     /** Default value of relaxed consistency flag. */
     public static final boolean DFLT_RELAXED_CONSISTENCY = true;
 
+    /** Default value of update file length on flush flag. */
+    public static final boolean DFLT_UPDATE_FILE_LEN_ON_FLUSH = false;
+
     /** IGFS instance name. */
     private String name;
 
@@ -178,6 +182,9 @@ public class FileSystemConfiguration {
     /** Relaxed consistency flag. */
     private boolean relaxedConsistency = DFLT_RELAXED_CONSISTENCY;
 
+    /** Update file length on flush flag. */
+    private boolean updateFileLenOnFlush = DFLT_UPDATE_FILE_LEN_ON_FLUSH;
+
     /**
      * Constructs default configuration.
      */
@@ -225,6 +232,7 @@ public class FileSystemConfiguration {
         relaxedConsistency = cfg.isRelaxedConsistency();
         seqReadsBeforePrefetch = cfg.getSequentialReadsBeforePrefetch();
         trashPurgeTimeout = cfg.getTrashPurgeTimeout();
+        updateFileLenOnFlush = cfg.isUpdateFileLengthOnFlush();
     }
 
     /**
@@ -922,6 +930,32 @@ public class FileSystemConfiguration {
         this.relaxedConsistency = relaxedConsistency;
     }
 
+    /**
+     * Get whether to update file length on flush.
+     * <p>
+     * Controls whether to update file length or not when {@link IgfsOutputStream#flush()} method is invoked. You
+     * may want to set this property to true in case you want to read from a file which is being written at the
+     * same time.
+     * <p>
+     * Defaults to {@link #DFLT_UPDATE_FILE_LEN_ON_FLUSH}.
+     *
+     * @return Whether to update file length on flush.
+     */
+    public boolean isUpdateFileLengthOnFlush() {
+        return updateFileLenOnFlush;
+    }
+
+    /**
+     * Set whether to update file length on flush.
+     * <p>
+     * Set {@link #isUpdateFileLengthOnFlush()} for more information.
+     *
+     * @param updateFileLenOnFlush Whether to update file length on flush.
+     */
+    public void setUpdateFileLengthOnFlush(boolean updateFileLenOnFlush) {
+        this.updateFileLenOnFlush = updateFileLenOnFlush;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(FileSystemConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 57a8c6c..34d77f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -473,18 +473,18 @@ public class IgfsDataManager extends IgfsManager {
     /**
      * Registers write future in igfs data manager.
      *
-     * @param fileInfo File info of file opened to write.
+     * @param fileId File ID.
      * @return Future that will be completed when all ack messages are received or when write failed.
      */
-    public IgniteInternalFuture<Boolean> writeStart(IgfsEntryInfo fileInfo) {
-        WriteCompletionFuture fut = new WriteCompletionFuture(fileInfo.id());
+    public IgniteInternalFuture<Boolean> writeStart(IgniteUuid fileId) {
+        WriteCompletionFuture fut = new WriteCompletionFuture(fileId);
 
-        WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileInfo.id(), fut);
+        WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileId, fut);
 
-        assert oldFut == null : "Opened write that is being concurrently written: " + fileInfo;
+        assert oldFut == null : "Opened write that is being concurrently written: " + fileId;
 
         if (log.isDebugEnabled())
-            log.debug("Registered write completion future for file output stream [fileInfo=" + fileInfo +
+            log.debug("Registered write completion future for file output stream [fileId=" + fileId +
                 ", fut=" + fut + ']');
 
         return fut;
@@ -493,17 +493,23 @@ public class IgfsDataManager extends IgfsManager {
     /**
      * Notifies data manager that no further writes will be performed on stream.
      *
-     * @param fileInfo File info being written.
+     * @param fileId File ID.
+     * @param await Await completion.
+     * @throws IgniteCheckedException If failed.
      */
-    public void writeClose(IgfsEntryInfo fileInfo) {
-        WriteCompletionFuture fut = pendingWrites.get(fileInfo.id());
+    public void writeClose(IgniteUuid fileId, boolean await) throws IgniteCheckedException {
+        WriteCompletionFuture fut = pendingWrites.get(fileId);
 
-        if (fut != null)
+        if (fut != null) {
             fut.markWaitingLastAck();
+
+            if (await)
+                fut.get();
+        }
         else {
             if (log.isDebugEnabled())
                 log.debug("Failed to find write completion future for file in pending write map (most likely it was " +
-                    "failed): " + fileInfo);
+                    "failed): " + fileId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 262dfef..bc26ace 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1051,7 +1051,7 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(),
+                    IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
                         bufferSize(bufSize), mode, batch);
 
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
@@ -1081,7 +1081,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
             }
         });
     }
@@ -1116,7 +1116,7 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    return new IgfsEventAwareOutputStream(path, desc.info(), bufferSize(bufSize), mode, batch);
+                    return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
                 }
 
                 final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1157,7 +1157,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
             }
         });
     }
@@ -1680,43 +1680,6 @@ public final class IgfsImpl implements IgfsEx {
     }
 
     /**
-     * IGFS output stream extension that fires events.
-     */
-    private class IgfsEventAwareOutputStream extends IgfsOutputStreamImpl {
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
-        /**
-         * Constructs file output stream.
-         *
-         * @param path Path to stored file.
-         * @param fileInfo File info.
-         * @param bufSize The size of the buffer to be used.
-         * @param mode IGFS mode.
-         * @param batch Optional secondary file system batch.
-         */
-        IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
-            @Nullable IgfsFileWorkerBatch batch) {
-            super(igfsCtx, path, fileInfo, bufSize, mode, batch, metrics);
-
-            metrics.incrementFilesOpenedForWrite();
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-        @Override protected void onClose() throws IOException {
-            if (closeGuard.compareAndSet(false, true)) {
-                super.onClose();
-
-                metrics.decrementFilesOpenedForWrite();
-
-                if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
-                    evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_WRITE, bytes()));
-            }
-        }
-    }
-
-    /**
      * IGFS input stream extension that fires events.
      */
     private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 465116b..91bb334 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -280,24 +279,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Return nodes where meta cache is defined.
-     *
-     * @return Nodes where meta cache is defined.
-     */
-    Collection<ClusterNode> metaCacheNodes() {
-        if (busyLock.enterBusy()) {
-            try {
-                return igfsCtx.kernalContext().discovery().cacheNodes(metaCache.name(), AffinityTopologyVersion.NONE);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get meta cache nodes because Grid is stopping.");
-    }
-
-    /**
      * Gets file ID for specified path.
      *
      * @param path Path.
@@ -630,21 +611,36 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Remove explicit lock on file held by the current thread.
+     * Remove explicit lock on file held by the current stream.
      *
-     * @param info File info to unlock.
+     * @param fileId File ID.
+     * @param lockId Lock ID.
      * @param modificationTime Modification time to write to file info.
      * @throws IgniteCheckedException If failed.
      */
-    public void unlock(final IgfsEntryInfo info, final long modificationTime) throws IgniteCheckedException {
-        validTxState(false);
+    public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime)
+        throws IgniteCheckedException {
+        unlock(fileId, lockId, modificationTime, false, 0, null);
+    }
 
-        assert info != null;
+    /**
+     * Remove explicit lock on file held by the current stream.
+     *
+     * @param fileId File ID.
+     * @param lockId Lock ID.
+     * @param modificationTime Modification time to write to file info.
+     * @param updateSpace Whether to update space.
+     * @param space Space.
+     * @param affRange Affinity range.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime,
+        final boolean updateSpace, final long space, @Nullable final IgfsFileAffinityRange affRange)
+        throws IgniteCheckedException {
+        validTxState(false);
 
         if (busyLock.enterBusy()) {
             try {
-                final IgniteUuid lockId = info.lockId();
-
                 if (lockId == null)
                     return;
 
@@ -656,8 +652,6 @@ public class IgfsMetaManager extends IgfsManager {
                         @Override public Void applyx() throws IgniteCheckedException {
                             validTxState(true);
 
-                            IgniteUuid fileId = info.id();
-
                             // Lock file ID for this transaction.
                             IgfsEntryInfo oldInfo = info(fileId);
 
@@ -665,12 +659,13 @@ public class IgfsMetaManager extends IgfsManager {
                                 throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " +
                                     "found): " + fileId));
 
-                            if (!F.eq(info.lockId(), oldInfo.lockId()))
+                            if (!F.eq(lockId, oldInfo.lockId()))
                                 throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " +
-                                    "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
+                                    "[fileId=" + fileId + ", lockId=" + lockId + ", actualLockId=" +
                                     oldInfo.lockId() + ']');
 
-                            id2InfoPrj.invoke(fileId, new IgfsMetaFileUnlockProcessor(modificationTime));
+                            id2InfoPrj.invoke(fileId,
+                                new IgfsMetaFileUnlockProcessor(modificationTime, updateSpace, space, affRange));
 
                             return null;
                         }
@@ -688,7 +683,7 @@ public class IgfsMetaManager extends IgfsManager {
             }
         }
         else
-            throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + info);
+            throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + fileId);
     }
 
     /**
@@ -1449,38 +1444,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Check whether there are any pending deletes and return collection of pending delete entry IDs.
-     *
-     * @return Collection of entry IDs to be deleted.
-     * @throws IgniteCheckedException If operation failed.
-     */
-    public Collection<IgniteUuid> pendingDeletes() throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                Collection<IgniteUuid> ids = new HashSet<>();
-
-                for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++) {
-                    IgniteUuid trashId = IgfsUtils.trashId(i);
-
-                    IgfsEntryInfo trashInfo = getInfo(trashId);
-
-                    if (trashInfo != null && trashInfo.hasChildren()) {
-                        for (IgfsListingEntry entry : trashInfo.listing().values())
-                            ids.add(entry.fileId());
-                    }
-                }
-
-                return ids;
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get pending deletes because Grid is stopping.");
-    }
-
-    /**
      * Update file info (file properties) in cache in existing transaction.
      *
      * @param fileId File ID to update information for.
@@ -1545,27 +1508,26 @@ public class IgfsMetaManager extends IgfsManager {
     /**
      * Reserve space for file.
      *
-     * @param path File path.
      * @param fileId File ID.
      * @param space Space.
      * @param affRange Affinity range.
      * @return New file info.
      */
-    public IgfsEntryInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
+    public IgfsEntryInfo reserveSpace(IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
         throws IgniteCheckedException {
         validTxState(false);
 
         if (busyLock.enterBusy()) {
             try {
                 if (log.isDebugEnabled())
-                    log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
+                    log.debug("Reserve file space: " + fileId);
 
                 try (IgniteInternalTx tx = startTx()) {
                     // Lock file ID for this transaction.
                     IgfsEntryInfo oldInfo = info(fileId);
 
                     if (oldInfo == null)
-                        throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
+                        throw fsException("File has been deleted concurrently: " + fileId);
 
                     IgfsEntryInfo newInfo =
                         invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
@@ -1583,8 +1545,7 @@ public class IgfsMetaManager extends IgfsManager {
             }
         }
         else
-            throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path +
-                ", id=" + fileId + ']');
+            throw new IllegalStateException("Failed to reserve file space because Grid is stopping:" + fileId);
     }
 
     /**
@@ -1882,121 +1843,8 @@ public class IgfsMetaManager extends IgfsManager {
                 // Events to fire (can be done outside of a transaction).
                 final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
 
-                SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
-                    new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
-                        /** Output stream to the secondary file system. */
-                        private OutputStream out;
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
-                            IgfsEntryInfo> infos) throws Exception {
-                            validTxState(true);
-
-                            assert !infos.isEmpty();
-
-                            // Determine the first existing parent.
-                            IgfsPath parentPath = null;
-
-                            for (IgfsPath curPath : infos.keySet()) {
-                                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
-                                    parentPath = curPath;
-                            }
-
-                            assert parentPath != null;
-
-                            IgfsEntryInfo parentInfo = infos.get(parentPath);
-
-                            // Delegate to the secondary file system.
-                            out = simpleCreate ? fs.create(path, overwrite) :
-                                fs.create(path, bufSize, overwrite, replication, blockSize, props);
-
-                            IgfsPath parent0 = path.parent();
-
-                            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
-
-                            // If some of the parent directories were missing, synchronize again.
-                            if (!parentPath.equals(parent0)) {
-                                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
-
-                                // Fire notification about missing directories creation.
-                                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
-                                    IgfsPath evtPath = parent0;
-
-                                    while (!parentPath.equals(evtPath)) {
-                                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
-                                            EventType.EVT_IGFS_DIR_CREATED));
-
-                                        evtPath = evtPath.parent();
-
-                                        assert evtPath != null; // If this fails, then ROOT does not exist.
-                                    }
-                                }
-                            }
-
-                            // Get created file info.
-                            IgfsFile status = fs.info(path);
-
-                            if (status == null)
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because it no longer exists: " + path);
-                            else if (status.isDirectory())
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because the path points to a directory: " + path);
-
-                            IgfsEntryInfo newInfo = IgfsUtils.createFile(
-                                IgniteUuid.randomUuid(),
-                                status.blockSize(),
-                                status.length(),
-                                affKey,
-                                createFileLockId(false),
-                                igfsCtx.igfs().evictExclude(path, false),
-                                status.properties(),
-                                status.accessTime(),
-                                status.modificationTime()
-                            );
-
-                            // Add new file info to the listing optionally removing the previous one.
-                            assert parentInfo != null;
-
-                            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
-
-                            if (oldId != null) {
-                                IgfsEntryInfo oldInfo = info(oldId);
-
-                                assert oldInfo != null; // Otherwise cache is in inconsistent state.
-
-                                // The contact is that we cannot overwrite a file locked for writing:
-                                if (oldInfo.lockId() != null)
-                                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
-                                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
-
-                                id2InfoPrj.remove(oldId); // Remove the old one.
-                                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
-                                    path.name(), parentInfo.listing().get(path.name()).fileId()));
-
-                                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
-
-                                igfsCtx.data().delete(oldInfo);
-                            }
-
-                            // Record CREATE event if needed.
-                            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
-                                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
-
-                            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
-                        }
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
-                            throws IgniteCheckedException {
-                            U.closeQuiet(out);
-
-                            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
-                                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
-                                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
-                            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
-                                "exception: " + path, err);
-                        }
-                    };
+                CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+                    overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
 
                 try {
                     return synchronizeAndExecute(task, fs, false, path.parent());
@@ -2956,29 +2804,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Synchronization task interface.
-     */
-    private static interface SynchronizationTask<T> {
-        /**
-         * Callback handler in case synchronization was successful.
-         *
-         * @param infos Map from paths to corresponding infos.
-         * @return Task result.
-         * @throws Exception If failed.
-         */
-        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
-        /**
-         * Callback handler in case synchronization failed.
-         *
-         * @param err Optional exception.
-         * @return Task result.
-         * @throws IgniteCheckedException In case exception is to be thrown in that case.
-         */
-        public T onFailure(Exception err) throws IgniteCheckedException;
-    }
-
-    /**
      * Append routine.
      *
      * @param path Path.
@@ -3352,4 +3177,205 @@ public class IgfsMetaManager extends IgfsManager {
         if (delWorker0 != null)
             delWorker0.signal();
     }
+
+    /**
+     * Synchronization task interface.
+     */
+    private static interface SynchronizationTask<T> {
+        /**
+         * Callback handler in case synchronization was successful.
+         *
+         * @param infos Map from paths to corresponding infos.
+         * @return Task result.
+         * @throws Exception If failed.
+         */
+        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+        /**
+         * Callback handler in case synchronization failed.
+         *
+         * @param err Optional exception.
+         * @return Task result.
+         * @throws IgniteCheckedException In case exception is to be thrown in that case.
+         */
+        public T onFailure(Exception err) throws IgniteCheckedException;
+    }
+
+    /**
+     * Synchronization task to create a file.
+     */
+    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+        /** Secondary file system. */
+        private IgfsSecondaryFileSystem fs;
+
+        /** Path. */
+        private IgfsPath path;
+
+        /** Simple create flag. */
+        private boolean simpleCreate;
+
+        /** Properties. */
+        private Map<String, String> props;
+
+        /** Overwrite flag. */
+        private boolean overwrite;
+
+        /** Buffer size. */
+        private int bufSize;
+
+        /** Replication factor. */
+        private short replication;
+
+        /** Block size. */
+        private long blockSize;
+
+        /** Affinity key. */
+        private IgniteUuid affKey;
+
+        /** Pending events. */
+        private Deque<IgfsEvent> pendingEvts;
+
+        /** Output stream to the secondary file system. */
+        private OutputStream out;
+
+        /**
+         * Constructor.
+         *
+         * @param fs Secondary file system.
+         * @param path Path.
+         * @param simpleCreate Simple create flag.
+         * @param props Properties.
+         * @param overwrite Overwrite flag.
+         * @param bufSize Buffer size.
+         * @param replication Replication factor.
+         * @param blockSize Block size.
+         * @param affKey Affinity key.
+         * @param pendingEvts Pending events.
+         */
+        public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+            @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+            IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+            this.fs = fs;
+            this.path = path;
+            this.simpleCreate = simpleCreate;
+            this.props = props;
+            this.overwrite = overwrite;
+            this.bufSize = bufSize;
+            this.replication = replication;
+            this.blockSize = blockSize;
+            this.affKey = affKey;
+            this.pendingEvts = pendingEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+            throws Exception {
+            validTxState(true);
+
+            assert !infos.isEmpty();
+
+            // Determine the first existing parent.
+            IgfsPath parentPath = null;
+
+            for (IgfsPath curPath : infos.keySet()) {
+                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+                    parentPath = curPath;
+            }
+
+            assert parentPath != null;
+
+            IgfsEntryInfo parentInfo = infos.get(parentPath);
+
+            // Delegate to the secondary file system.
+            out = simpleCreate ? fs.create(path, overwrite) :
+                fs.create(path, bufSize, overwrite, replication, blockSize, props);
+
+            IgfsPath parent0 = path.parent();
+
+            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+
+            // If some of the parent directories were missing, synchronize again.
+            if (!parentPath.equals(parent0)) {
+                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+
+                // Fire notification about missing directories creation.
+                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+                    IgfsPath evtPath = parent0;
+
+                    while (!parentPath.equals(evtPath)) {
+                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+                            EventType.EVT_IGFS_DIR_CREATED));
+
+                        evtPath = evtPath.parent();
+
+                        assert evtPath != null; // If this fails, then ROOT does not exist.
+                    }
+                }
+            }
+
+            // Get created file info.
+            IgfsFile status = fs.info(path);
+
+            if (status == null)
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because it no longer exists: " + path);
+            else if (status.isDirectory())
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because the path points to a directory: " + path);
+
+            IgfsEntryInfo newInfo = IgfsUtils.createFile(
+                IgniteUuid.randomUuid(),
+                status.blockSize(),
+                status.length(),
+                affKey,
+                createFileLockId(false),
+                igfsCtx.igfs().evictExclude(path, false),
+                status.properties(),
+                status.accessTime(),
+                status.modificationTime()
+            );
+
+            // Add new file info to the listing optionally removing the previous one.
+            assert parentInfo != null;
+
+            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+
+            if (oldId != null) {
+                IgfsEntryInfo oldInfo = info(oldId);
+
+                assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+                // The contact is that we cannot overwrite a file locked for writing:
+                if (oldInfo.lockId() != null)
+                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
+                id2InfoPrj.remove(oldId); // Remove the old one.
+                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+                    path.name(), parentInfo.listing().get(path.name()).fileId()));
+
+                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+
+                igfsCtx.data().delete(oldInfo);
+            }
+
+            // Record CREATE event if needed.
+            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+
+            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+            U.closeQuiet(out);
+
+            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+                "exception: " + path, err);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
deleted file mode 100644
index 43de61e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Output stream to store data into grid cache with separate blocks.
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-abstract class IgfsOutputStreamAdapter extends IgfsOutputStream {
-    /** Path to file. */
-    protected final IgfsPath path;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Flag for this stream open/closed state. */
-    private boolean closed;
-
-    /** Local buffer to store stream data as consistent block. */
-    private ByteBuffer buf;
-
-    /** Bytes written. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    protected long bytes;
-
-    /** Time consumed by write operations. */
-    protected long time;
-
-    /**
-     * Constructs file output stream.
-     *
-     * @param path Path to stored file.
-     * @param bufSize The size of the buffer to be used.
-     */
-    IgfsOutputStreamAdapter(IgfsPath path, int bufSize) {
-        assert path != null;
-        assert bufSize > 0;
-
-        this.path = path;
-        this.bufSize = bufSize;
-    }
-
-    /**
-     * Gets number of written bytes.
-     *
-     * @return Written bytes.
-     */
-    public long bytes() {
-        return bytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        checkClosed(null, 0);
-
-        long startTime = System.nanoTime();
-
-        b &= 0xFF;
-
-        if (buf == null)
-            buf = ByteBuffer.allocate(bufSize);
-
-        buf.put((byte)b);
-
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
-        A.notNull(b, "b");
-
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
-                ", length=" + len + ']');
-        }
-
-        checkClosed(null, 0);
-
-        if (len == 0)
-            return; // Done.
-
-        long startTime = System.nanoTime();
-
-        if (buf == null) {
-            // Do not allocate and copy byte buffer if will send data immediately.
-            if (len >= bufSize) {
-                buf = ByteBuffer.wrap(b, off, len);
-
-                sendData(false);
-
-                return;
-            }
-
-            buf = ByteBuffer.allocate(Math.max(bufSize, len));
-        }
-
-        if (buf.remaining() < len)
-            // Expand buffer capacity, if remaining size is less then data size.
-            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
-
-        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
-            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
-
-        buf.put(b, off, len);
-
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
-        checkClosed(in, len);
-
-        long startTime = System.nanoTime();
-
-        // Send all IPC data from the local buffer before streaming.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
-
-        try {
-            storeDataBlocks(in, len);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException(e.getMessage(), e);
-        }
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /**
-     * Flushes this output stream and forces any buffered output bytes to be written out.
-     *
-     * @exception IOException  if an I/O error occurs.
-     */
-    @Override public synchronized void flush() throws IOException {
-        checkClosed(null, 0);
-
-        // Send all IPC data from the local buffer.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public final synchronized void close() throws IOException {
-        // Do nothing if stream is already closed.
-        if (closed)
-            return;
-
-        try {
-            // Send all IPC data from the local buffer.
-            try {
-                flush();
-            }
-            finally {
-                onClose(); // "onClose()" routine must be invoked anyway!
-            }
-        }
-        finally {
-            // Mark this stream closed AFTER flush.
-            closed = true;
-        }
-    }
-
-    /**
-     * Store data blocks in file.<br/>
-     * Note! If file concurrently deleted we'll get lost blocks.
-     *
-     * @param data Data to store.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException;
-
-    /**
-     * Store data blocks in file reading appropriate number of bytes from given data input.
-     *
-     * @param in Data input to read from.
-     * @param len Data length to store.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException;
-
-    /**
-     * Close callback. It will be called only once in synchronized section.
-     *
-     * @throws IOException If failed.
-     */
-    protected void onClose() throws IOException {
-        // No-op.
-    }
-
-    /**
-     * Validate this stream is open.
-     *
-     * @throws IOException If this stream is closed.
-     */
-    private void checkClosed(@Nullable DataInput in, int len) throws IOException {
-        assert Thread.holdsLock(this);
-
-        if (closed) {
-            // Must read data from stream before throwing exception.
-            if (in != null)
-                in.skipBytes(len);
-
-            throw new IOException("Stream has been closed: " + this);
-        }
-    }
-
-    /**
-     * Send all local-buffered data to server.
-     *
-     * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
-     *      byte array.
-     * @throws IOException In case of IO exception.
-     */
-    private void sendData(boolean flip) throws IOException {
-        assert Thread.holdsLock(this);
-
-        try {
-            if (flip)
-                buf.flip();
-
-            storeDataBlock(buf);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to store data into file: " + path, e);
-        }
-
-        buf = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsOutputStreamAdapter.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 21e5fb6..7741a25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,12 +18,13 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -32,8 +33,8 @@ import org.jetbrains.annotations.Nullable;
 import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -41,25 +42,44 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
 /**
  * Output stream to store data into grid cache with separate blocks.
  */
-class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
+class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Maximum number of blocks in buffer. */
     private static final int MAX_BLOCKS_CNT = 16;
 
     /** IGFS context. */
-    private IgfsContext igfsCtx;
+    private final IgfsContext igfsCtx;
 
-    /** Meta info manager. */
-    private final IgfsMetaManager meta;
+    /** Path to file. */
+    private final IgfsPath path;
 
-    /** Data manager. */
-    private final IgfsDataManager data;
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** IGFS mode. */
+    private final IgfsMode mode;
+
+    /** File worker batch. */
+    private final IgfsFileWorkerBatch batch;
+
+    /** Mutex for synchronization. */
+    private final Object mux = new Object();
+
+    /** Flag for this stream open/closed state. */
+    private boolean closed;
+
+    /** Local buffer to store stream data as consistent block. */
+    private ByteBuffer buf;
+
+    /** Bytes written. */
+    private long bytes;
+
+    /** Time consumed by write operations. */
+    private long time;
 
     /** File descriptor. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IgfsEntryInfo fileInfo;
 
     /** Space in file to write data. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private long space;
 
     /** Intermediate remainder to keep data. */
@@ -68,21 +88,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     /** Data length in remainder. */
     private int remainderDataLen;
 
-    /** Write completion future. */
-    private final IgniteInternalFuture<Boolean> writeCompletionFut;
-
-    /** IGFS mode. */
-    private final IgfsMode mode;
-
-    /** File worker batch. */
-    private final IgfsFileWorkerBatch batch;
-
-    /** Ensures that onClose)_ routine is called no more than once. */
-    private final AtomicBoolean onCloseGuard = new AtomicBoolean();
-
-    /** Local IGFS metrics. */
-    private final IgfsLocalMetrics metrics;
-
     /** Affinity written by this output stream. */
     private IgfsFileAffinityRange streamRange;
 
@@ -95,259 +100,234 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
      * @param bufSize The size of the buffer to be used.
      * @param mode Grid IGFS mode.
      * @param batch Optional secondary file system batch.
-     * @param metrics Local IGFS metrics.
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
-        @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
-        super(path, optimizeBufferSize(bufSize, fileInfo));
-
-        assert fileInfo != null;
-        assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
-        assert mode != null && mode != PROXY;
-        assert mode == PRIMARY && batch == null || batch != null;
-        assert metrics != null;
+        @Nullable IgfsFileWorkerBatch batch) {
+        assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+        assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
 
         // File hasn't been locked.
         if (fileInfo.lockId() == null)
             throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
 
-        assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+        synchronized (mux) {
+            this.path = path;
+            this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+            this.igfsCtx = igfsCtx;
+            this.fileInfo = fileInfo;
+            this.mode = mode;
+            this.batch = batch;
 
-        this.igfsCtx = igfsCtx;
-        meta = igfsCtx.meta();
-        data = igfsCtx.data();
+            streamRange = initialStreamRange(fileInfo);
 
-        this.fileInfo = fileInfo;
-        this.mode = mode;
-        this.batch = batch;
-        this.metrics = metrics;
-
-        streamRange = initialStreamRange(fileInfo);
+            igfsCtx.data().writeStart(fileInfo.id());
+        }
 
-        writeCompletionFut = data.writeStart(fileInfo);
+        igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
     }
 
-    /**
-     * Optimize buffer size.
-     *
-     * @param bufSize Requested buffer size.
-     * @param fileInfo File info.
-     * @return Optimized buffer size.
-     */
-    @SuppressWarnings("IfMayBeConditional")
-    private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
-        assert bufSize > 0;
-
-        if (fileInfo == null)
-            return bufSize;
-
-        int blockSize = fileInfo.blockSize();
+    /** {@inheritDoc} */
+    @Override public void write(int b) throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-        if (blockSize <= 0)
-            return bufSize;
+            b &= 0xFF;
 
-        if (bufSize <= blockSize)
-            // Optimize minimum buffer size to be equal file's block size.
-            return blockSize;
+            long startTime = System.nanoTime();
 
-        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+            if (buf == null)
+                buf = allocateNewBuffer();
 
-        if (bufSize > maxBufSize)
-            // There is no profit or optimization from larger buffers.
-            return maxBufSize;
+            buf.put((byte)b);
 
-        if (fileInfo.length() == 0)
-            // Make buffer size multiple of block size (optimized for new files).
-            return bufSize / blockSize * blockSize;
+            sendBufferIfFull();
 
-        return bufSize;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
-        int writeLen = block.remaining();
+    @SuppressWarnings("NullableProblems")
+    @Override public void write(byte[] b, int off, int len) throws IOException {
+        A.notNull(b, "b");
 
-        preStoreDataBlocks(null, writeLen);
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+                ", length=" + len + ']');
+        }
 
-        int blockSize = fileInfo.blockSize();
+        synchronized (mux) {
+            checkClosed(null, 0);
+
+            // Check if there is anything to write.
+            if (len == 0)
+                return;
 
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + writeLen < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
+            long startTime = System.nanoTime();
 
-                byte[] allocated = new byte[blockSize];
+            if (buf == null) {
+                if (len >= bufSize) {
+                    // Send data right away.
+                    ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
 
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+                    send(tmpBuf, tmpBuf.remaining());
+                }
+                else {
+                    buf = allocateNewBuffer();
 
-                remainder = allocated;
+                    buf.put(b, off, len);
+                }
             }
+            else {
+                // Re-allocate buffer if needed.
+                if (buf.remaining() < len)
+                    buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
 
-            block.get(remainder, remainderDataLen, writeLen);
+                buf.put(b, off, len);
 
-            remainderDataLen += writeLen;
-        }
-        else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
-                false, streamRange, batch);
+                sendBufferIfFull();
+            }
 
-            remainderDataLen = remainder == null ? 0 : remainder.length;
+            time += System.nanoTime() - startTime;
         }
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
-        preStoreDataBlocks(in, len);
-
-        int blockSize = fileInfo.blockSize();
-
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + len < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
+    @Override public void transferFrom(DataInput in, int len) throws IOException {
+        synchronized (mux) {
+            checkClosed(in, len);
 
-                byte[] allocated = new byte[blockSize];
+            long startTime = System.nanoTime();
 
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+            // Clean-up local buffer before streaming.
+            sendBufferIfNotEmpty();
 
-                remainder = allocated;
-            }
-
-            in.readFully(remainder, remainderDataLen, len);
-
-            remainderDataLen += len;
-        }
-        else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
-                false, streamRange, batch);
+            // Perform transfer.
+            send(in, len);
 
-            remainderDataLen = remainder == null ? 0 : remainder.length;
+            time += System.nanoTime() - startTime;
         }
     }
 
     /**
-     * Initializes data loader if it was not initialized yet and updates written space.
+     * Flushes this output stream and forces any buffered output bytes to be written out.
      *
-     * @param len Data length to be written.
+     * @exception IOException  if an I/O error occurs.
      */
-    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
-        // Check if any exception happened while writing data.
-        if (writeCompletionFut.isDone()) {
-            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+    @Override public void flush() throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-            if (in != null)
-                in.skipBytes(len);
+            sendBufferIfNotEmpty();
 
-            writeCompletionFut.get();
-        }
+            flushRemainder();
+
+            awaitAcks();
+
+            // Update file length if needed.
+            if (igfsCtx.configuration().isUpdateFileLengthOnFlush() && space > 0) {
+                try {
+                    IgfsEntryInfo fileInfo0 = igfsCtx.meta().reserveSpace(fileInfo.id(), space, streamRange);
+
+                    if (fileInfo0 == null)
+                        throw new IOException("File was concurrently deleted: " + path);
+                    else
+                        fileInfo = fileInfo0;
+
+                    streamRange = initialStreamRange(fileInfo);
 
-        bytes += len;
-        space += len;
+                    space = 0;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to update file length data [path=" + path +
+                        ", space=" + space + ']', e);
+                }
+            }
+        }
     }
 
     /**
-     * Flushes this output stream and forces any buffered output bytes to be written out.
+     * Await acknowledgments.
      *
-     * @exception IOException  if an I/O error occurs.
+     * @throws IOException If failed.
      */
-    @Override public synchronized void flush() throws IOException {
-        boolean exists;
-
+    private void awaitAcks() throws IOException {
         try {
-            exists = meta.exists(fileInfo.id());
+            igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
         }
         catch (IgniteCheckedException e) {
-            throw new IOException("File to read file metadata: " + path, e);
+            throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e);
         }
+    }
 
-        if (!exists) {
-            onClose(true);
-
-            throw new IOException("File was concurrently deleted: " + path);
-        }
-
-        super.flush();
-
+    /**
+     * Flush remainder.
+     *
+     * @throws IOException If failed.
+     */
+    private void flushRemainder() throws IOException {
         try {
             if (remainder != null) {
-                data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+                igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
                     ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
 
                 remainder = null;
                 remainderDataLen = 0;
             }
-
-            if (space > 0) {
-                data.awaitAllAcksReceived(fileInfo.id());
-
-                IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
-
-                if (fileInfo0 == null)
-                    throw new IOException("File was concurrently deleted: " + path);
-                else
-                    fileInfo = fileInfo0;
-
-                streamRange = initialStreamRange(fileInfo);
-
-                space = 0;
-            }
         }
         catch (IgniteCheckedException e) {
-            throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
+            throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e);
         }
     }
 
     /** {@inheritDoc} */
-    @Override protected void onClose() throws IOException {
-        onClose(false);
-    }
+    @Override public final void close() throws IOException {
+        synchronized (mux) {
+            // Do nothing if stream is already closed.
+            if (closed)
+                return;
 
-    /**
-     * Close callback. It will be called only once in synchronized section.
-     *
-     * @param deleted Whether we already know that the file was deleted.
-     * @throws IOException If failed.
-     */
-    private void onClose(boolean deleted) throws IOException {
-        assert Thread.holdsLock(this);
-
-        if (onCloseGuard.compareAndSet(false, true)) {
-            // Notify backing secondary file system batch to finish.
-            if (mode != PRIMARY) {
-                assert batch != null;
+            // Set closed flag immediately.
+            closed = true;
 
-                batch.finish();
-            }
+            // Flush data.
+            IOException err = null;
 
-            // Ensure file existence.
-            boolean exists;
+            boolean flushSuccess = false;
 
             try {
-                exists = !deleted && meta.exists(fileInfo.id());
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException("File to read file metadata: " + path, e);
-            }
+                sendBufferIfNotEmpty();
 
-            if (exists) {
-                IOException err = null;
+                flushRemainder();
 
-                try {
-                    data.writeClose(fileInfo);
+                igfsCtx.data().writeClose(fileInfo.id(), true);
 
-                    writeCompletionFut.get();
-                }
-                catch (IgniteCheckedException e) {
-                    err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
-                }
+                flushSuccess = true;
+            }
+            catch (Exception e) {
+                err = new IOException("Failed to flush data during stream close [path=" + path +
+                    ", fileInfo=" + fileInfo + ']', e);
+            }
 
-                metrics.addWrittenBytesTime(bytes, time);
+            // Unlock the file after data is flushed.
+            try {
+                if (flushSuccess && space > 0)
+                    igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis(), true,
+                        space, streamRange);
+                else
+                    igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis());
+            }
+            catch (Exception e) {
+                if (err == null)
+                    err = new IOException("File to release file lock: " + path, e);
+                else
+                    err.addSuppressed(e);
+            }
+
+            // Finally, await secondary file system flush.
+            if (batch != null) {
+                batch.finish();
 
-                // Await secondary file system processing to finish.
                 if (mode == DUAL_SYNC) {
                     try {
                         batch.await();
@@ -356,40 +336,141 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
                         if (err == null)
                             err = new IOException("Failed to close secondary file system stream [path=" + path +
                                 ", fileInfo=" + fileInfo + ']', e);
+                        else
+                            err.addSuppressed(e);
                     }
                 }
+            }
 
-                long modificationTime = System.currentTimeMillis();
+            // Throw error, if any.
+            if (err != null)
+                throw err;
 
-                try {
-                    meta.unlock(fileInfo, modificationTime);
-                }
-                catch (IgfsPathNotFoundException ignore) {
-                    data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+            igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
+            igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
 
-                    throw new IOException("File was concurrently deleted: " + path);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("File to read file metadata: " + path, e);
+            GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+            if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+                evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+                    EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+        }
+    }
+
+    /**
+     * Validate this stream is open.
+     *
+     * @throws IOException If this stream is closed.
+     */
+    private void checkClosed(@Nullable DataInput in, int len) throws IOException {
+        assert Thread.holdsLock(mux);
+
+        if (closed) {
+            // Must read data from stream before throwing exception.
+            if (in != null)
+                in.skipBytes(len);
+
+            throw new IOException("Stream has been closed: " + this);
+        }
+    }
+
+    /**
+     * Send local buffer if it full.
+     *
+     * @throws IOException If failed.
+     */
+    private void sendBufferIfFull() throws IOException {
+        if (buf.position() >= bufSize)
+            sendBuffer();
+    }
+
+    /**
+     * Send local buffer if at least something is stored there.
+     *
+     * @throws IOException
+     */
+    private void sendBufferIfNotEmpty() throws IOException {
+        if (buf != null && buf.position() > 0)
+            sendBuffer();
+    }
+
+    /**
+     * Send all local-buffered data to server.
+     *
+     * @throws IOException In case of IO exception.
+     */
+    private void sendBuffer() throws IOException {
+        buf.flip();
+
+        send(buf, buf.remaining());
+
+        buf = null;
+    }
+
+    /**
+     * Store data block.
+     *
+     * @param data Block.
+     * @param writeLen Write length.
+     * @throws IOException If failed.
+     */
+    private void send(Object data, int writeLen) throws IOException {
+        assert Thread.holdsLock(mux);
+        assert data instanceof ByteBuffer || data instanceof DataInput;
+
+        try {
+            // Increment metrics.
+            bytes += writeLen;
+            space += writeLen;
+
+            int blockSize = fileInfo.blockSize();
+
+            // If data length is not enough to fill full block, fill the remainder and return.
+            if (remainderDataLen + writeLen < blockSize) {
+                if (remainder == null)
+                    remainder = new byte[blockSize];
+                else if (remainder.length != blockSize) {
+                    assert remainderDataLen == remainder.length;
+
+                    byte[] allocated = new byte[blockSize];
+
+                    U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+                    remainder = allocated;
                 }
 
-                if (err != null)
-                    throw err;
+                if (data instanceof ByteBuffer)
+                    ((ByteBuffer) data).get(remainder, remainderDataLen, writeLen);
+                else
+                    ((DataInput) data).readFully(remainder, remainderDataLen, writeLen);
+
+                remainderDataLen += writeLen;
             }
             else {
-                try {
-                    if (mode == DUAL_SYNC)
-                        batch.await();
+                if (data instanceof ByteBuffer) {
+                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                        remainderDataLen, (ByteBuffer) data, false, streamRange, batch);
                 }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to close secondary file system stream [path=" + path +
-                        ", fileInfo=" + fileInfo + ']', e);
-                }
-                finally {
-                    data.delete(fileInfo);
+                else {
+                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                        remainderDataLen, (DataInput) data, writeLen, false, streamRange, batch);
                 }
+
+                remainderDataLen = remainder == null ? 0 : remainder.length;
             }
         }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to store data into file: " + path, e);
+        }
+    }
+
+    /**
+     * Allocate new buffer.
+     *
+     * @return New buffer.
+     */
+    private ByteBuffer allocateNewBuffer() {
+        return ByteBuffer.allocate(bufSize);
     }
 
     /**
@@ -421,11 +502,46 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
 
         IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
 
-        IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+        IgniteUuid affKey = igfsCtx.data().nextAffinityKey(prevAffKey);
 
         return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
     }
 
+    /**
+     * Optimize buffer size.
+     *
+     * @param bufSize Requested buffer size.
+     * @param fileInfo File info.
+     * @return Optimized buffer size.
+     */
+    private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
+        assert bufSize > 0;
+
+        if (fileInfo == null)
+            return bufSize;
+
+        int blockSize = fileInfo.blockSize();
+
+        if (blockSize <= 0)
+            return bufSize;
+
+        if (bufSize <= blockSize)
+            // Optimize minimum buffer size to be equal file's block size.
+            return blockSize;
+
+        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+        if (bufSize > maxBufSize)
+            // There is no profit or optimization from larger buffers.
+            return maxBufSize;
+
+        if (fileInfo.length() == 0)
+            // Make buffer size multiple of block size (optimized for new files).
+            return bufSize / blockSize * blockSize;
+
+        return bufSize;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsOutputStreamImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
index 6827e4a..8d23490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
@@ -24,8 +24,11 @@ import org.apache.ignite.binary.BinaryReader;
 import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange;
+import org.apache.ignite.internal.processors.igfs.IgfsFileMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
@@ -46,6 +49,15 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
     /** Modification time. */
     private long modificationTime;
 
+    /** Whether to update space. */
+    private boolean updateSpace;
+
+    /** Space. */
+    private long space;
+
+    /** Affinity range. */
+    private IgfsFileAffinityRange affRange;
+
     /**
      * Default constructor.
      */
@@ -57,17 +69,36 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
      * Constructor.
      *
      * @param modificationTime Modification time.
+     * @param updateSpace Whether to update space.
+     * @param space Space.
+     * @param affRange Affinity range.
      */
-    public IgfsMetaFileUnlockProcessor(long modificationTime) {
+    public IgfsMetaFileUnlockProcessor(long modificationTime, boolean updateSpace, long space,
+        @Nullable IgfsFileAffinityRange affRange) {
         this.modificationTime = modificationTime;
+        this.updateSpace = updateSpace;
+        this.space = space;
+        this.affRange = affRange;
     }
 
     /** {@inheritDoc} */
     @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
         throws EntryProcessorException {
-        IgfsEntryInfo old = entry.getValue();
+        IgfsEntryInfo oldInfo = entry.getValue();
+
+        assert oldInfo != null;
+
+        IgfsEntryInfo newInfo = oldInfo.unlock(modificationTime);
+
+        if (updateSpace) {
+            IgfsFileMap newMap = new IgfsFileMap(newInfo.fileMap());
 
-        entry.setValue(old.unlock(modificationTime));
+            newMap.addRange(affRange);
+
+            newInfo = newInfo.length(newInfo.length() + space).fileMap(newMap);
+        }
+
+        entry.setValue(newInfo);
 
         return null;
     }
@@ -75,11 +106,27 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeLong(modificationTime);
+
+        if (updateSpace) {
+            out.writeBoolean(true);
+            out.writeLong(space);
+            out.writeObject(affRange);
+        }
+        else
+            out.writeBoolean(false);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         modificationTime = in.readLong();
+
+        if (in.readBoolean()) {
+            updateSpace = true;
+            space = in.readLong();
+            affRange = (IgfsFileAffinityRange)in.readObject();
+        }
+        else
+            updateSpace = false;
     }
 
     /** {@inheritDoc} */
@@ -87,6 +134,14 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
         BinaryRawWriter out = writer.rawWriter();
 
         out.writeLong(modificationTime);
+
+        if (updateSpace) {
+            out.writeBoolean(true);
+            out.writeLong(space);
+            out.writeObject(affRange);
+        }
+        else
+            out.writeBoolean(false);
     }
 
     /** {@inheritDoc} */
@@ -94,6 +149,14 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
         BinaryRawReader in = reader.rawReader();
 
         modificationTime = in.readLong();
+
+        if (in.readBoolean()) {
+            updateSpace = true;
+            space = in.readLong();
+            affRange = in.readObject();
+        }
+        else
+            updateSpace = false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 76a038d..261a494 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2987,10 +2987,12 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @param chunks Expected data.
      * @throws Exception If failed.
      */
-    protected void checkFile(IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath file,
+    protected void checkFile(@Nullable IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath file,
         @Nullable byte[]... chunks) throws Exception {
-        checkExist(igfs, file);
-        checkFileContent(igfs, file, chunks);
+        if (igfs != null) {
+            checkExist(igfs, file);
+            checkFileContent(igfs, file, chunks);
+        }
 
         if (dual) {
             checkExist(igfsSecondary, file);
@@ -3016,16 +3018,18 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
                 is = igfs.open(file);
 
                 int chunkIdx = 0;
+                int pos = 0;
 
                 for (byte[] chunk : chunks) {
                     byte[] buf = new byte[chunk.length];
 
-                    is.readFully(0, buf);
+                    is.readFully(pos, buf);
 
                     assert Arrays.equals(chunk, buf) : "Bad chunk [igfs=" + igfs.name() + ", chunkIdx=" + chunkIdx +
                         ", expected=" + Arrays.toString(chunk) + ", actual=" + Arrays.toString(buf) + ']';
 
                     chunkIdx++;
+                    pos += chunk.length;
                 }
 
                 is.close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
index 013bb18..0d1a66f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
@@ -178,7 +178,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
 
             rnd.nextBytes(data);
 
-            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
 
             expectsStoreFail(info, data, "Not enough space reserved to store data");
 
@@ -195,7 +195,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
 
             assert remainder == null;
 
-            mgr.writeClose(info);
+            mgr.writeClose(info.id(), false);
 
             fut.get(3000);
 
@@ -269,7 +269,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
 
             info = info.length(info.length() + data.length + remainder.length);
 
-            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
 
             IgfsFileAffinityRange range = new IgfsFileAffinityRange();
 
@@ -287,7 +287,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
 
             assert left2 == null;
 
-            mgr.writeClose(info);
+            mgr.writeClose(info.id(), false);
 
             fut.get(3000);
 
@@ -358,7 +358,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
 
             info = info.length(info.length() + data.length * writesCnt);
 
-            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+            IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
 
             for (int j = 0; j < 64; j++) {
                 Arrays.fill(data, (byte)(j / 4));
@@ -369,7 +369,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
                 assert left == null : "No remainder should be returned if flush is true: " + Arrays.toString(left);
             }
 
-            mgr.writeClose(info);
+            mgr.writeClose(info.id(), false);
 
             assertTrue(range.regionEqual(new IgfsFileAffinityRange(0, writesCnt * chunkSize - 1, null)));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 0ce1036..29bb2cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -445,27 +445,41 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testInvalidEndpointTcpPort() throws Exception {
+    public void testZeroEndpointTcpPort() throws Exception {
+        checkIvalidPort(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNegativeEndpointTcpPort() throws Exception {
+        checkIvalidPort(-1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTooBigEndpointTcpPort() throws Exception {
+        checkIvalidPort(65536);
+    }
+
+    /**
+     * Check invalid port handling.
+     *
+     * @param port Port.
+     * @throws Exception If failed.
+     */
+    private void checkIvalidPort(int port) throws Exception {
         final String failMsg = "IGFS endpoint TCP port is out of range";
         g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
 
         final String igfsCfgName = "igfs-cfg";
         final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
-        igfsEndpointCfg.setPort(0);
+        igfsEndpointCfg.setPort(port);
         g1IgfsCfg1.setName(igfsCfgName);
         g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
 
         checkGridStartFails(g1Cfg, failMsg, true);
-
-        igfsEndpointCfg.setPort(-1);
-        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
-
-        checkGridStartFails(g1Cfg, failMsg, true);
-
-        igfsEndpointCfg.setPort(65536);
-        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
-
-        checkGridStartFails(g1Cfg, failMsg, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5706ff/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
index 57174ea..e5abfea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
@@ -137,7 +137,7 @@ public class IgfsTaskSelfTest extends IgfsCommonAbstractTest {
         metaCacheCfg.setName("metaCache");
         metaCacheCfg.setCacheMode(REPLICATED);
         metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
         IgniteConfiguration cfg = new IgniteConfiguration();
 


Mime
View raw message