ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [20/76] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 18:03:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
new file mode 100644
index 0000000..ee629c1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+
+/**
+ * Base class for all GGFS communication messages sent between nodes.
+ */
+public abstract class IgfsCommunicationMessage extends MessageAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @param ldr Class loader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
new file mode 100644
index 0000000..89680d9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+
+/**
+ * GGFS context holding all required components for GGFS instance.
+ */
+public class IgfsContext {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Configuration. */
+    private final IgfsConfiguration cfg;
+
+    /** Managers. */
+    private List<IgfsManager> mgrs = new LinkedList<>();
+
+    /** Meta manager. */
+    private final IgfsMetaManager metaMgr;
+
+    /** Data manager. */
+    private final IgfsDataManager dataMgr;
+
+    /** Server manager. */
+    private final IgfsServerManager srvMgr;
+
+    /** Fragmentizer manager. */
+    private final IgfsFragmentizerManager fragmentizerMgr;
+
+    /** GGFS instance. */
+    private final IgfsEx ggfs;
+
+    /**
+     * @param ctx Kernal context.
+     * @param cfg GGFS configuration.
+     * @param metaMgr Meta manager.
+     * @param dataMgr Data manager.
+     * @param srvMgr Server manager.
+     * @param fragmentizerMgr Fragmentizer manager.
+     * @throws IgniteCheckedException If GGFs context instantiation is failed.
+     */
+    public IgfsContext(
+        GridKernalContext ctx,
+        IgfsConfiguration cfg,
+        IgfsMetaManager metaMgr,
+        IgfsDataManager dataMgr,
+        IgfsServerManager srvMgr,
+        IgfsFragmentizerManager fragmentizerMgr
+    ) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.cfg = cfg;
+
+        this.metaMgr = add(metaMgr);
+        this.dataMgr = add(dataMgr);
+        this.srvMgr = add(srvMgr);
+        this.fragmentizerMgr = add(fragmentizerMgr);
+
+        ggfs = new IgfsImpl(this);
+    }
+
+    /**
+     * @return GGFS instance.
+     */
+    public IgfsEx ggfs() {
+        return ggfs;
+    }
+
+    /**
+     * @return Kernal context.
+     */
+    public GridKernalContext kernalContext() {
+        return ctx;
+    }
+
+    /**
+     * @return GGFS configuration.
+     */
+    public IgfsConfiguration configuration() {
+        return cfg;
+    }
+
+    /**
+     * @return List of managers, in starting order.
+     */
+    public List<IgfsManager> managers() {
+        return mgrs;
+    }
+
+    /**
+     * @return Meta manager.
+     */
+    public IgfsMetaManager meta() {
+        return metaMgr;
+    }
+
+    /**
+     * @return Data manager.
+     */
+    public IgfsDataManager data() {
+        return dataMgr;
+    }
+
+    /**
+     * @return Server manager.
+     */
+    public IgfsServerManager server() {
+        return srvMgr;
+    }
+
+    /**
+     * @return Fragmentizer manager.
+     */
+    public IgfsFragmentizerManager fragmentizer() {
+        return fragmentizerMgr;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void send(UUID nodeId, Object topic, IgfsCommunicationMessage msg, GridIoPolicy plc)
+        throws IgniteCheckedException {
+        if (!kernalContext().localNodeId().equals(nodeId))
+            msg.prepareMarshal(kernalContext().config().getMarshaller());
+
+        kernalContext().io().send(nodeId, topic, msg, plc);
+    }
+
+    /**
+     * @param node Node.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, GridIoPolicy plc)
+        throws IgniteCheckedException {
+        if (!kernalContext().localNodeId().equals(node.id()))
+            msg.prepareMarshal(kernalContext().config().getMarshaller());
+
+        kernalContext().io().send(node, topic, msg, plc);
+    }
+
+    /**
+     * Checks if given node is a GGFS node.
+     *
+     * @param node Node to check.
+     * @return {@code True} if node has GGFS with this name, {@code false} otherwise.
+     */
+    public boolean ggfsNode(ClusterNode node) {
+        assert node != null;
+
+        IgfsAttributes[] ggfs = node.attribute(ATTR_GGFS);
+
+        if (ggfs != null)
+            for (IgfsAttributes attrs : ggfs)
+                if (F.eq(cfg.getName(), attrs.ggfsName()))
+                    return true;
+
+        return false;
+    }
+
+    /**
+     * Adds manager to managers list.
+     *
+     * @param mgr Manager.
+     * @return Added manager.
+     */
+    private <T extends IgfsManager> T add(@Nullable T mgr) {
+        if (mgr != null)
+            mgrs.add(mgr);
+
+        return mgr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
new file mode 100644
index 0000000..14fb6c5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -0,0 +1,1906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Cache based file's data container.
+ */
+public class IgfsDataManager extends IgfsManager {
+    /** GGFS. */
+    private IgfsEx ggfs;
+
+    /** Data cache projection. */
+    private GridCacheProjectionEx<IgfsBlockKey, byte[]> dataCachePrj;
+
+    /** Data cache. */
+    private GridCache<Object, Object> dataCache;
+
+    /** */
+    private IgniteInternalFuture<?> dataCacheStartFut;
+
+    /** Local GGFS metrics. */
+    private IgfsLocalMetrics metrics;
+
+    /** Group block size. */
+    private long grpBlockSize;
+
+    /** Group size. */
+    private int grpSize;
+
+    /** Byte buffer writer. */
+    private ByteBufferBlocksWriter byteBufWriter = new ByteBufferBlocksWriter();
+
+    /** Data input writer. */
+    private DataInputBlocksWriter dataInputWriter = new DataInputBlocksWriter();
+
+    /** Pending writes future. */
+    private ConcurrentMap<IgniteUuid, WriteCompletionFuture> pendingWrites = new ConcurrentHashMap8<>();
+
+    /** Affinity key generator. */
+    private AtomicLong affKeyGen = new AtomicLong();
+
+    /** GGFS executor service. */
+    private ExecutorService ggfsSvc;
+
+    /** Request ID counter for write messages. */
+    private AtomicLong reqIdCtr = new AtomicLong();
+
+    /** GGFS communication topic. */
+    private Object topic;
+
+    /** Async file delete worker. */
+    private AsyncDeleteWorker delWorker;
+
+    /** Trash purge timeout. */
+    private long trashPurgeTimeout;
+
+    /** On-going remote reads futures. */
+    private final ConcurrentHashMap8<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts =
+        new ConcurrentHashMap8<>();
+
+    /** Executor service for puts in dual mode */
+    private volatile ExecutorService putExecSvc;
+
+    /** Executor service for puts in dual mode shutdown flag. */
+    private volatile boolean putExecSvcShutdown;
+
+    /** Maximum amount of data in pending puts. */
+    private volatile long maxPendingPuts;
+
+    /** Current amount of data in pending puts. */
+    private long curPendingPuts;
+
+    /** Lock for pending puts. */
+    private final Lock pendingPutsLock = new ReentrantLock();
+
+    /** Condition for pending puts. */
+    private final Condition pendingPutsCond = pendingPutsLock.newCondition();
+
+    /**
+     *
+     */
+    void awaitInit() {
+        if (!dataCacheStartFut.isDone()) {
+            try {
+                dataCacheStartFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        ggfs = igfsCtx.ggfs();
+
+        dataCachePrj = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
+        dataCache = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
+
+        dataCacheStartFut = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName())
+            .preloader().startFuture();
+
+        if (dataCache.configuration().getAtomicityMode() != TRANSACTIONAL)
+            throw new IgniteCheckedException("Data cache should be transactional: " +
+                igfsCtx.configuration().getDataCacheName());
+
+        metrics = igfsCtx.ggfs().localMetrics();
+
+        assert dataCachePrj != null;
+
+        CacheAffinityKeyMapper mapper = igfsCtx.kernalContext().cache()
+            .internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper();
+
+        grpSize = mapper instanceof IgfsGroupDataBlocksKeyMapper ?
+            ((IgfsGroupDataBlocksKeyMapper)mapper).groupSize() : 1;
+
+        grpBlockSize = igfsCtx.configuration().getBlockSize() * grpSize;
+
+        String ggfsName = igfsCtx.configuration().getName();
+
+        topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName);
+
+        igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (msg instanceof IgfsBlocksMessage)
+                    processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
+                else if (msg instanceof IgfsAckMessage)
+                    processAckMessage(nodeId, (IgfsAckMessage)msg);
+            }
+        });
+
+        igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                if (igfsCtx.ggfsNode(discoEvt.eventNode())) {
+                    for (WriteCompletionFuture future : pendingWrites.values()) {
+                        future.onError(discoEvt.eventNode().id(),
+                            new ClusterTopologyCheckedException("Node left grid before write completed: " + evt.node().id()));
+                    }
+                }
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        ggfsSvc = igfsCtx.kernalContext().getGgfsExecutorService();
+
+        trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout();
+
+        putExecSvc = igfsCtx.configuration().getDualModePutExecutorService();
+
+        if (putExecSvc != null)
+            putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown();
+        else {
+            int coresCnt = Runtime.getRuntime().availableProcessors();
+
+            // Note that we do not pre-start threads here as GGFS pool may not be needed.
+            putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque<Runnable>());
+
+            putExecSvcShutdown = true;
+        }
+
+        maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();
+
+        delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
+            "ggfs-" + ggfsName + "-delete-worker", log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        new Thread(delWorker).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        if (cancel)
+            delWorker.cancel();
+        else
+            delWorker.stop();
+
+        try {
+            // Always wait thread exit.
+            U.join(delWorker);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
+        }
+
+        if (putExecSvcShutdown)
+            U.shutdownNow(getClass(), putExecSvc, log);
+    }
+
+    /**
+     * @return Number of bytes used to store files.
+     */
+    public long spaceSize() {
+        return dataCachePrj.ggfsDataSpaceUsed();
+    }
+
+    /**
+     * @return Maximum number of bytes for GGFS data cache.
+     */
+    public long maxSpaceSize() {
+        return dataCachePrj.ggfsDataSpaceMax();
+    }
+
+    /**
+     * Generates next affinity key for local node based on current topology. If previous affinity key maps
+     * on local node, return previous affinity key to prevent unnecessary file map growth.
+     *
+     * @param prevAffKey Affinity key of previous block.
+     * @return Affinity key.
+     */
+    public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
+        // Do not generate affinity key for non-affinity nodes.
+        if (!isAffinityNode(dataCache.configuration()))
+            return null;
+
+        UUID nodeId = igfsCtx.kernalContext().localNodeId();
+
+        if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal())
+            return prevAffKey;
+
+        while (true) {
+            IgniteUuid key = new IgniteUuid(nodeId, affKeyGen.getAndIncrement());
+
+            if (dataCache.affinity().mapKeyToNode(key).isLocal())
+                return key;
+        }
+    }
+
+    /**
+     * Maps affinity key to node.
+     *
+     * @param affinityKey Affinity key to map.
+     * @return Primary node for this key.
+     */
+    public ClusterNode affinityNode(Object affinityKey) {
+        return dataCache.affinity().mapKeyToNode(affinityKey);
+    }
+
+    /**
+     * Creates new instance of explicit data loader.
+     *
+     * @return New instance of data loader.
+     */
+    private IgniteDataLoader<IgfsBlockKey, byte[]> dataLoader() {
+        IgniteDataLoader<IgfsBlockKey, byte[]> ldr =
+            igfsCtx.kernalContext().<IgfsBlockKey, byte[]>dataLoad().dataLoader(dataCachePrj.name());
+
+        IgfsConfiguration cfg = igfsCtx.configuration();
+
+        if (cfg.getPerNodeBatchSize() > 0)
+            ldr.perNodeBufferSize(cfg.getPerNodeBatchSize());
+
+        if (cfg.getPerNodeParallelBatchCount() > 0)
+            ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount());
+
+        ldr.updater(GridDataLoadCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted());
+
+        return ldr;
+    }
+
+    /**
+     * Get list of local data blocks of the given file.
+     *
+     * @param fileInfo File info.
+     * @return List of local data block indices.
+     * @throws IgniteCheckedException If failed.
+     */
+    public List<Long> listLocalDataBlocks(IgfsFileInfo fileInfo)
+        throws IgniteCheckedException {
+        assert fileInfo != null;
+
+        int prevGrpIdx = 0; // Block index within affinity group.
+
+        boolean prevPrimaryFlag = false; // Whether previous block was primary.
+
+        List<Long> res = new ArrayList<>();
+
+        for (long i = 0; i < fileInfo.blocksCount(); i++) {
+            // Determine group index.
+            int grpIdx = (int)(i % grpSize);
+
+            if (prevGrpIdx < grpIdx) {
+                // Reuse existing affinity result.
+                if (prevPrimaryFlag)
+                    res.add(i);
+            }
+            else {
+                // Re-calculate affinity result.
+                IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(),
+                    fileInfo.evictExclude(), i);
+
+                Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
+
+                assert affNodes != null && !affNodes.isEmpty();
+
+                ClusterNode primaryNode = affNodes.iterator().next();
+
+                if (primaryNode.id().equals(igfsCtx.kernalContext().localNodeId())) {
+                    res.add(i);
+
+                    prevPrimaryFlag = true;
+                }
+                else
+                    prevPrimaryFlag = false;
+            }
+
+            prevGrpIdx = grpIdx;
+        }
+
+        return res;
+    }
+
+    /**
+     * Get data block for specified file ID and block index.
+     *
+     * @param fileInfo File info.
+     * @param path Path reading from.
+     * @param blockIdx Block index.
+     * @param secReader Optional secondary file system reader.
+     * @return Requested data block or {@code null} if nothing found.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsFileInfo fileInfo, final IgfsPath path,
+        final long blockIdx, @Nullable final IgfsReader secReader)
+        throws IgniteCheckedException {
+        //assert validTxState(any); // Allow this method call for any transaction state.
+
+        assert fileInfo != null;
+        assert blockIdx >= 0;
+
+        // Schedule block request BEFORE prefetch requests.
+        final IgfsBlockKey key = blockKey(blockIdx, fileInfo);
+
+        if (log.isDebugEnabled() &&
+            dataCache.affinity().isPrimaryOrBackup(igfsCtx.kernalContext().discovery().localNode(), key)) {
+            log.debug("Reading non-local data block [path=" + path + ", fileInfo=" + fileInfo +
+                ", blockIdx=" + blockIdx + ']');
+        }
+
+        IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key);
+
+        if (secReader != null) {
+            fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() {
+                @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException {
+                    byte[] res = fut.get();
+
+                    if (res == null) {
+                        GridFutureAdapter<byte[]> rmtReadFut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+
+                        IgniteInternalFuture<byte[]> oldRmtReadFut = rmtReadFuts.putIfAbsent(key, rmtReadFut);
+
+                        if (oldRmtReadFut == null) {
+                            try {
+                                if (log.isDebugEnabled())
+                                    log.debug("Reading non-local data block in the secondary file system [path=" +
+                                        path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']');
+
+                                int blockSize = fileInfo.blockSize();
+
+                                long pos = blockIdx * blockSize; // Calculate position for Hadoop
+
+                                res = new byte[blockSize];
+
+                                int read = 0;
+
+                                synchronized (secReader) {
+                                    try {
+                                        // Delegate to the secondary file system.
+                                        while (read < blockSize) {
+                                            int r = secReader.read(pos + read, res, read, blockSize - read);
+
+                                            if (r < 0)
+                                                break;
+
+                                            read += r;
+                                        }
+                                    }
+                                    catch (IOException e) {
+                                        throw new IgniteCheckedException("Failed to read data due to secondary file system " +
+                                            "exception: " + e.getMessage(), e);
+                                    }
+                                }
+
+                                // If we did not read full block at the end of the file - trim it.
+                                if (read != blockSize)
+                                    res = Arrays.copyOf(res, read);
+
+                                rmtReadFut.onDone(res);
+
+                                putSafe(key, res);
+
+                                metrics.addReadBlocks(1, 1);
+                            }
+                            catch (IgniteCheckedException e) {
+                                rmtReadFut.onDone(e);
+
+                                throw e;
+                            }
+                            finally {
+                                boolean rmv = rmtReadFuts.remove(key, rmtReadFut);
+
+                                assert rmv;
+                            }
+                        }
+                        else {
+                            // Wait for existing future to finish and get it's result.
+                            res = oldRmtReadFut.get();
+
+                            metrics.addReadBlocks(1, 0);
+                        }
+                    }
+                    else
+                        metrics.addReadBlocks(1, 0);
+
+                    return res;
+                }
+            });
+        }
+        else
+            metrics.addReadBlocks(1, 0);
+
+        return fut;
+    }
+
+    /**
+     * Registers write future in ggfs data manager.
+     *
+     * @param fileInfo File info of file opened to write.
+     * @return Future that will be completed when all ack messages are received or when write failed.
+     */
+    public IgniteInternalFuture<Boolean> writeStart(IgfsFileInfo fileInfo) {
+        WriteCompletionFuture fut = new WriteCompletionFuture(igfsCtx.kernalContext(), fileInfo.id());
+
+        WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileInfo.id(), fut);
+
+        assert oldFut == null : "Opened write that is being concurrently written: " + fileInfo;
+
+        if (log.isDebugEnabled())
+            log.debug("Registered write completion future for file output stream [fileInfo=" + fileInfo +
+                ", fut=" + fut + ']');
+
+        return fut;
+    }
+
+    /**
+     * Notifies data manager that no further writes will be performed on stream.
+     *
+     * @param fileInfo File info being written.
+     */
+    public void writeClose(IgfsFileInfo fileInfo) {
+        WriteCompletionFuture fut = pendingWrites.get(fileInfo.id());
+
+        if (fut != null)
+            fut.markWaitingLastAck();
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find write completion future for file in pending write map (most likely it was " +
+                    "failed): " + fileInfo);
+        }
+    }
+
+    /**
+     * Store data blocks in file.<br/>
+     * Note! If file concurrently deleted we'll get lost blocks.
+     *
+     * @param fileInfo File info.
+     * @param reservedLen Reserved length.
+     * @param remainder Remainder.
+     * @param remainderLen Remainder length.
+     * @param data Data to store.
+     * @param flush Flush flag.
+     * @param affinityRange Affinity range to update if file write can be colocated.
+     * @param batch Optional secondary file system worker batch.
+     *
+     * @return Remainder if data did not fill full block.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public byte[] storeDataBlocks(
+        IgfsFileInfo fileInfo,
+        long reservedLen,
+        @Nullable byte[] remainder,
+        int remainderLen,
+        ByteBuffer data,
+        boolean flush,
+        IgfsFileAffinityRange affinityRange,
+        @Nullable IgfsFileWorkerBatch batch
+    ) throws IgniteCheckedException {
+        //assert validTxState(any); // Allow this method call for any transaction state.
+
+        return byteBufWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, data, data.remaining(),
+            flush, affinityRange, batch);
+    }
+
+    /**
+     * Store data blocks in file.<br/>
+     * Note! If file concurrently deleted we'll got lost blocks.
+     *
+     * @param fileInfo File info.
+     * @param reservedLen Reserved length.
+     * @param remainder Remainder.
+     * @param remainderLen Remainder length.
+     * @param in Data to store.
+     * @param len Data length to store.
+     * @param flush Flush flag.
+     * @param affinityRange File affinity range to update if file cal be colocated.
+     * @param batch Optional secondary file system worker batch.
+     * @throws IgniteCheckedException If failed.
+     * @return Remainder of data that did not fit the block if {@code flush} flag is {@code false}.
+     * @throws IOException If store failed.
+     */
+    @Nullable public byte[] storeDataBlocks(
+        IgfsFileInfo fileInfo,
+        long reservedLen,
+        @Nullable byte[] remainder,
+        int remainderLen,
+        DataInput in,
+        int len,
+        boolean flush,
+        IgfsFileAffinityRange affinityRange,
+        @Nullable IgfsFileWorkerBatch batch
+    ) throws IgniteCheckedException, IOException {
+        //assert validTxState(any); // Allow this method call for any transaction state.
+
+        return dataInputWriter.storeDataBlocks(fileInfo, reservedLen, remainder, remainderLen, in, len, flush,
+            affinityRange, batch);
+    }
+
+    /**
+     * Delete file's data from data cache.
+     *
+     * @param fileInfo File details to remove data for.
+     * @return Delete future that will be completed when file is actually erased.
+     */
+    public IgniteInternalFuture<Object> delete(IgfsFileInfo fileInfo) {
+        //assert validTxState(any); // Allow this method call for any transaction state.
+
+        if (!fileInfo.isFile()) {
+            if (log.isDebugEnabled())
+                log.debug("Cannot delete content of not-data file: " + fileInfo);
+
+            return new GridFinishedFuture<>(igfsCtx.kernalContext());
+        }
+        else
+            return delWorker.deleteAsync(fileInfo);
+    }
+
+    /**
+     * @param blockIdx Block index.
+     * @param fileInfo File info.
+     * @return Block key.
+     */
+    public IgfsBlockKey blockKey(long blockIdx, IgfsFileInfo fileInfo) {
+        if (fileInfo.affinityKey() != null)
+            return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), blockIdx);
+
+        if (fileInfo.fileMap() != null) {
+            IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockIdx * fileInfo.blockSize(), false);
+
+            return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), blockIdx);
+        }
+
+        return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), blockIdx);
+    }
+
+    /**
+     * Tries to remove blocks affected by fragmentizer. If {@code cleanNonColocated} is {@code true}, will remove
+     * non-colocated blocks as well.
+     *
+     * @param fileInfo File info to clean up.
+     * @param range Range to clean up.
+     * @param cleanNonColocated {@code True} if all blocks should be cleaned.
+     */
+    public void cleanBlocks(IgfsFileInfo fileInfo, IgfsFileAffinityRange range, boolean cleanNonColocated) {
+        long startIdx = range.startOffset() / fileInfo.blockSize();
+
+        long endIdx = range.endOffset() / fileInfo.blockSize();
+
+        if (log.isDebugEnabled())
+            log.debug("Cleaning blocks [fileInfo=" + fileInfo + ", range=" + range +
+                ", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']');
+
+        try {
+            try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+                for (long idx = startIdx; idx <= endIdx; idx++) {
+                    ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(),
+                        idx));
+
+                    if (cleanNonColocated)
+                        ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx));
+                }
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e);
+        }
+    }
+
+    /**
+     * Moves all colocated blocks in range to non-colocated keys.
+     * @param fileInfo File info to move data for.
+     * @param range Range to move.
+     */
+    public void spreadBlocks(IgfsFileInfo fileInfo, IgfsFileAffinityRange range) {
+        long startIdx = range.startOffset() / fileInfo.blockSize();
+
+        long endIdx = range.endOffset() / fileInfo.blockSize();
+
+        try {
+            try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+                long bytesProcessed = 0;
+
+                for (long idx = startIdx; idx <= endIdx; idx++) {
+                    IgfsBlockKey colocatedKey = new IgfsBlockKey(fileInfo.id(), range.affinityKey(),
+                        fileInfo.evictExclude(), idx);
+
+                    IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), idx);
+
+                    // Most of the time should be local get.
+                    byte[] block = dataCachePrj.get(colocatedKey);
+
+                    if (block != null) {
+                        // Need to check if block is partially written.
+                        // If so, must update it in pessimistic transaction.
+                        if (block.length != fileInfo.blockSize()) {
+                            try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                                Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
+
+                                byte[] val = vals.get(colocatedKey);
+
+                                if (val != null) {
+                                    dataCachePrj.putx(key, val);
+
+                                    tx.commit();
+                                }
+                                else {
+                                    // File is being concurrently deleted.
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find colocated file block for spread (will ignore) " +
+                                            "[fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx +
+                                            ", endIdx=" + endIdx + ", idx=" + idx + ']');
+                                }
+                            }
+                        }
+                        else
+                            ldr.addData(key, block);
+
+                        bytesProcessed += block.length;
+
+                        if (bytesProcessed >= igfsCtx.configuration().getFragmentizerThrottlingBlockLength()) {
+                            ldr.flush();
+
+                            bytesProcessed = 0;
+
+                            U.sleep(igfsCtx.configuration().getFragmentizerThrottlingDelay());
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to find colocated file block for spread (will ignore) " +
+                            "[fileInfo=" + fileInfo + ", range=" + range + ", startIdx=" + startIdx +
+                            ", endIdx=" + endIdx + ", idx=" + idx + ']');
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e);
+        }
+    }
+
+    /**
+     * Resolve affinity nodes for specified part of file.
+     *
+     * @param info File info to resolve affinity nodes for.
+     * @param start Start position in the file.
+     * @param len File part length to get affinity for.
+     * @return Affinity blocks locations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsFileInfo info, long start, long len)
+        throws IgniteCheckedException {
+        return affinity(info, start, len, 0);
+    }
+
+    /**
+     * Resolve affinity nodes for specified part of file.
+     *
+     * @param info File info to resolve affinity nodes for.
+     * @param start Start position in the file.
+     * @param len File part length to get affinity for.
+     * @param maxLen Maximum block length.
+     * @return Affinity blocks locations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsFileInfo info, long start, long len, long maxLen)
+        throws IgniteCheckedException {
+        assert validTxState(false);
+        assert info.isFile() : "Failed to get affinity (not a file): " + info;
+        assert start >= 0 : "Start position should not be negative: " + start;
+        assert len >= 0 : "Part length should not be negative: " + len;
+
+        if (log.isDebugEnabled())
+            log.debug("Calculating affinity for file [info=" + info + ", start=" + start + ", len=" + len + ']');
+
+        // Skip affinity resolving, if no data requested.
+        if (len == 0)
+            return Collections.emptyList();
+
+        if (maxLen > 0) {
+            maxLen -= maxLen % info.blockSize();
+
+            // If maxLen is smaller than block size, then adjust it to the block size.
+            if (maxLen < info.blockSize())
+                maxLen = info.blockSize();
+        }
+        else
+            maxLen = 0;
+
+        // In case when affinity key is not null the whole file resides on one node.
+        if (info.affinityKey() != null) {
+            Collection<IgfsBlockLocation> res = new LinkedList<>();
+
+            splitBlocks(start, len, maxLen, dataCache.affinity().mapKeyToPrimaryAndBackups(
+                new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res);
+
+            return res;
+        }
+
+        // Need to merge ranges affinity with non-colocated affinity.
+        Deque<IgfsBlockLocation> res = new LinkedList<>();
+
+        if (info.fileMap().ranges().isEmpty()) {
+            affinity0(info, start, len, maxLen, res);
+
+            return res;
+        }
+
+        long pos = start;
+        long end = start + len;
+
+        for (IgfsFileAffinityRange range : info.fileMap().ranges()) {
+            if (log.isDebugEnabled())
+                log.debug("Checking range [range=" + range + ", pos=" + pos + ']');
+
+            // If current position is less than range start, add non-colocated affinity ranges.
+            if (range.less(pos)) {
+                long partEnd = Math.min(end, range.startOffset());
+
+                affinity0(info, pos, partEnd - pos, maxLen, res);
+
+                pos = partEnd;
+            }
+
+            IgfsBlockLocation last = res.peekLast();
+
+            if (range.belongs(pos)) {
+                long partEnd = Math.min(range.endOffset() + 1, end);
+
+                Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(
+                    range.affinityKey());
+
+                if (log.isDebugEnabled())
+                    log.debug("Calculated affinity for range [start=" + pos + ", end=" + partEnd +
+                        ", nodes=" + F.nodeIds(affNodes) + ", range=" + range +
+                        ", affNodes=" + F.nodeIds(affNodes) + ']');
+
+                if (last != null && equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id()))) {
+                    // Merge with the previous block in result.
+                    res.removeLast();
+
+                    splitBlocks(last.start(), last.length() + partEnd - pos, maxLen, affNodes, res);
+                }
+                else
+                    // Do not merge with the previous block.
+                    splitBlocks(pos, partEnd - pos, maxLen, affNodes, res);
+
+                pos = partEnd;
+            }
+            // Else skip this range.
+
+            if (log.isDebugEnabled())
+                log.debug("Finished range check [range=" + range + ", pos=" + pos + ", res=" + res + ']');
+
+            if (pos == end)
+                break;
+        }
+
+        // Final chunk.
+        if (pos != end)
+            affinity0(info, pos, end, maxLen, res);
+
+        return res;
+    }
+
+    /**
+     * Calculates non-colocated affinity for given file info and given region of file.
+     *
+     * @param info File info.
+     * @param start Start offset.
+     * @param len Length.
+     * @param maxLen Maximum allowed split length.
+     * @param res Result collection to add regions to.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void affinity0(IgfsFileInfo info, long start, long len, long maxLen, Deque<IgfsBlockLocation> res)
+        throws IgniteCheckedException {
+        long firstGrpIdx = start / grpBlockSize;
+        long limitGrpIdx = (start + len + grpBlockSize - 1) / grpBlockSize;
+
+        if (limitGrpIdx - firstGrpIdx > Integer.MAX_VALUE)
+            throw new IgfsException("Failed to get affinity (range is too wide)" +
+                " [info=" + info + ", start=" + start + ", len=" + len + ']');
+
+        if (log.isDebugEnabled())
+            log.debug("Mapping file region [fileInfo=" + info + ", start=" + start + ", len=" + len + ']');
+
+        for (long grpIdx = firstGrpIdx; grpIdx < limitGrpIdx; grpIdx++) {
+            // Boundaries of the block.
+            long blockStart;
+            long blockLen;
+
+            // The first block.
+            if (grpIdx == firstGrpIdx) {
+                blockStart = start % grpBlockSize;
+                blockLen = Math.min(grpBlockSize - blockStart, len);
+            }
+            // The last block.
+            else if (grpIdx == limitGrpIdx - 1) {
+                blockStart = 0;
+                blockLen = (start + len - 1) % grpBlockSize + 1;
+            }
+            // Other blocks.
+            else {
+                blockStart = 0;
+                blockLen = grpBlockSize;
+            }
+
+            // Affinity for the first block in the group.
+            IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(),
+                grpIdx * grpSize);
+
+            Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
+
+            if (log.isDebugEnabled())
+                log.debug("Mapped key to nodes [key=" + key + ", nodes=" + F.nodeIds(affNodes) +
+                ", blockStart=" + blockStart + ", blockLen=" + blockLen + ']');
+
+            IgfsBlockLocation last = res.peekLast();
+
+            // Merge with previous affinity block location?
+            if (last != null && equal(last.nodeIds(), F.viewReadOnly(affNodes, F.node2id()))) {
+                // Remove previous incomplete value.
+                res.removeLast();
+
+                // Update affinity block location with merged one.
+                splitBlocks(last.start(), last.length() + blockLen, maxLen, affNodes, res);
+            }
+            else
+                splitBlocks(grpIdx * grpBlockSize + blockStart, blockLen, maxLen, affNodes, res);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Calculated file affinity [info=" + info + ", start=" + start + ", len=" + len +
+                ", res=" + res + ']');
+    }
+
+    /**
+     * Split blocks according to maximum split length.
+     *
+     * @param start Start position.
+     * @param len Length.
+     * @param maxLen Maximum allowed length.
+     * @param nodes Affinity nodes.
+     * @param res Where to put results.
+     */
+    private void splitBlocks(long start, long len, long maxLen,
+        Collection<ClusterNode> nodes, Collection<IgfsBlockLocation> res) {
+        if (maxLen > 0) {
+            long end = start + len;
+
+            long start0 = start;
+
+            while (start0 < end) {
+                long len0 = Math.min(maxLen, end - start0);
+
+                res.add(new IgfsBlockLocationImpl(start0, len0, nodes));
+
+                start0 += len0;
+            }
+        }
+        else
+            res.add(new IgfsBlockLocationImpl(start, len, nodes));
+    }
+
+    /**
+     * Gets group block size (block size * group size).
+     *
+     * @return Group block size.
+     */
+    public long groupBlockSize() {
+        return grpBlockSize;
+    }
+
+    /**
+     * Check if two collections are equal as if they are lists (with respect to order).
+     *
+     * @param one First collection.
+     * @param two Second collection.
+     * @return {@code True} if equal.
+     */
+    private boolean equal(Collection<UUID> one, Collection<UUID> two) {
+        if (one.size() != two.size())
+            return false;
+
+        Iterator<UUID> it1 = one.iterator();
+        Iterator<UUID> it2 = two.iterator();
+
+        int size = one.size();
+
+        for (int i = 0; i < size; i++) {
+            if (!it1.next().equals(it2.next()))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Check transaction is (not) started.
+     *
+     * @param inTx Expected transaction state.
+     * @return Transaction state is correct.
+     */
+    private boolean validTxState(boolean inTx) {
+        boolean txState = inTx == (dataCachePrj.tx() != null);
+
+        assert txState : (inTx ? "Method cannot be called outside transaction: " :
+            "Method cannot be called in transaction: ") + dataCachePrj.tx();
+
+        return txState;
+    }
+
+    /**
+     * @param fileId File ID.
+     * @param node Node to process blocks on.
+     * @param blocks Blocks to put in cache.
+     * @throws IgniteCheckedException If batch processing failed.
+     */
+    private void processBatch(IgniteUuid fileId, final ClusterNode node,
+        final Map<IgfsBlockKey, byte[]> blocks) throws IgniteCheckedException {
+        final long batchId = reqIdCtr.getAndIncrement();
+
+        final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
+
+        if (completionFut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Missing completion future for file write request (most likely exception occurred " +
+                    "which will be thrown upon stream close) [nodeId=" + node.id() + ", fileId=" + fileId + ']');
+
+            return;
+        }
+
+        // Throw exception if future is failed in the middle of writing.
+        if (completionFut.isDone())
+            completionFut.get();
+
+        completionFut.onWriteRequest(node.id(), batchId);
+
+        final UUID nodeId = node.id();
+
+        if (!node.isLocal()) {
+            final IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks);
+
+            callGgfsLocalSafe(new GridPlainCallable<Object>() {
+                @Override @Nullable public Object call() throws Exception {
+                    try {
+                        igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        completionFut.onError(nodeId, e);
+                    }
+
+                    return null;
+                }
+            });
+        }
+        else {
+            callGgfsLocalSafe(new GridPlainCallable<Object>() {
+                @Override @Nullable public Object call() throws Exception {
+                    storeBlocksAsync(blocks).listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                            try {
+                                fut.get();
+
+                                completionFut.onWriteAck(nodeId, batchId);
+                            }
+                            catch (IgniteCheckedException e) {
+                                completionFut.onError(nodeId, e);
+                            }
+                        }
+                    });
+
+                    return null;
+                }
+            });
+        }
+    }
+
+    /**
+     * If partial block write is attempted, both colocated and non-colocated keys are locked and data is appended
+     * to correct block.
+     *
+     * @param fileId File ID.
+     * @param colocatedKey Block key.
+     * @param startOff Data start offset within block.
+     * @param data Data to write.
+     * @throws IgniteCheckedException If update failed.
+     */
+    private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
+        byte[] data) throws IgniteCheckedException {
+        if (dataCachePrj.ggfsDataSpaceUsed() >= dataCachePrj.ggfsDataSpaceMax()) {
+            try {
+                ggfs.awaitDeletesAsync().get(trashPurgeTimeout);
+            }
+            catch (IgniteFutureTimeoutCheckedException ignore) {
+                // Ignore.
+            }
+
+            // Additional size check.
+            if (dataCachePrj.ggfsDataSpaceUsed() >= dataCachePrj.ggfsDataSpaceMax()) {
+                final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
+
+                if (completionFut == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Missing completion future for file write request (most likely exception occurred " +
+                            "which will be thrown upon stream close) [fileId=" + fileId + ']');
+
+                    return;
+                }
+
+                IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
+                    "(GGFS maximum data size exceeded) [used=" + dataCachePrj.ggfsDataSpaceUsed() +
+                    ", allowed=" + dataCachePrj.ggfsDataSpaceMax() + ']');
+
+                completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
+                    igfsCtx.kernalContext().localNodeId(), e));
+
+                return;
+            }
+        }
+
+        // No affinity key present, just concat and return.
+        if (colocatedKey.affinityKey() == null) {
+            dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data));
+
+            return;
+        }
+
+        // If writing from block beginning, just put and return.
+        if (startOff == 0) {
+            dataCachePrj.putx(colocatedKey, data);
+
+            return;
+        }
+
+        // Create non-colocated key.
+        IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null,
+            colocatedKey.evictExclude(), colocatedKey.getBlockId());
+
+        try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+            // Lock keys.
+            Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
+
+            boolean hasVal = false;
+
+            UpdateProcessor transformClos = new UpdateProcessor(startOff, data);
+
+            if (vals.get(colocatedKey) != null) {
+                dataCachePrj.invoke(colocatedKey, transformClos);
+
+                hasVal = true;
+            }
+
+            if (vals.get(key) != null) {
+                dataCachePrj.invoke(key, transformClos);
+
+                hasVal = true;
+            }
+
+            if (!hasVal)
+                throw new IgniteCheckedException("Failed to write partial block (no previous data was found in cache) " +
+                    "[key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff +
+                    ", dataLen=" + data.length + ']');
+
+            tx.commit();
+        }
+    }
+
+    /**
+     * Executes callable in GGFS executor service. If execution rejected, callable will be executed
+     * in caller thread.
+     *
+     * @param c Callable to execute.
+     */
+    private <T> void callGgfsLocalSafe(Callable<T> c) {
+        try {
+            ggfsSvc.submit(c);
+        }
+        catch (RejectedExecutionException ignored) {
+            // This exception will happen if network speed is too low and data comes faster
+            // than we can send it to remote nodes.
+            try {
+                c.call();
+            }
+            catch (Exception e) {
+                log.warning("Failed to execute GGFS callable: " + c, e);
+            }
+        }
+    }
+
+    /**
+     * Put data block read from the secondary file system to the cache.
+     *
+     * @param key Key.
+     * @param data Data.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException {
+        assert key != null;
+        assert data != null;
+
+        if (maxPendingPuts > 0) {
+            pendingPutsLock.lock();
+
+            try {
+                while (curPendingPuts > maxPendingPuts)
+                    pendingPutsCond.await(2000, TimeUnit.MILLISECONDS);
+
+                curPendingPuts += data.length;
+            }
+            catch (InterruptedException ignore) {
+                throw new IgniteCheckedException("Failed to put GGFS data block into cache due to interruption: " + key);
+            }
+            finally {
+                pendingPutsLock.unlock();
+            }
+        }
+
+        Runnable task = new Runnable() {
+            @Override public void run() {
+                try {
+                    dataCachePrj.putx(key, data);
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to put GGFS data block into cache [key=" + key + ", err=" + e + ']');
+                }
+                finally {
+                    if (maxPendingPuts > 0) {
+                        pendingPutsLock.lock();
+
+                        try {
+                            curPendingPuts -= data.length;
+
+                            pendingPutsCond.signalAll();
+                        }
+                        finally {
+                            pendingPutsLock.unlock();
+                        }
+                    }
+                }
+            }
+        };
+
+        try {
+            putExecSvc.submit(task);
+        }
+        catch (RejectedExecutionException ignore) {
+            task.run();
+        }
+    }
+
+    /**
+     * @param blocks Blocks to write.
+     * @return Future that will be completed after put is done.
+     */
+    @SuppressWarnings("unchecked")
+    private IgniteInternalFuture<?> storeBlocksAsync(Map<IgfsBlockKey, byte[]> blocks) {
+        assert !blocks.isEmpty();
+
+        if (dataCachePrj.ggfsDataSpaceUsed() >= dataCachePrj.ggfsDataSpaceMax()) {
+            try {
+                try {
+                    ggfs.awaitDeletesAsync().get(trashPurgeTimeout);
+                }
+                catch (IgniteFutureTimeoutCheckedException ignore) {
+                    // Ignore.
+                }
+
+                // Additional size check.
+                if (dataCachePrj.ggfsDataSpaceUsed() >= dataCachePrj.ggfsDataSpaceMax())
+                    return new GridFinishedFuture<Object>(igfsCtx.kernalContext(),
+                        new IgfsOutOfSpaceException("Failed to write data block (GGFS maximum data size " +
+                            "exceeded) [used=" + dataCachePrj.ggfsDataSpaceUsed() +
+                            ", allowed=" + dataCachePrj.ggfsDataSpaceMax() + ']'));
+
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(igfsCtx.kernalContext(), new IgniteCheckedException("Failed to store data " +
+                    "block due to unexpected exception.", e));
+            }
+        }
+
+        return dataCachePrj.putAllAsync(blocks);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param blocksMsg Write request message.
+     */
+    private void processBlocksMessage(final UUID nodeId, final IgfsBlocksMessage blocksMsg) {
+        storeBlocksAsync(blocksMsg.blocks()).listenAsync(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> fut) {
+                IgniteCheckedException err = null;
+
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    err = e;
+                }
+
+                try {
+                    // Send reply back to node.
+                    igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err),
+                        SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId +
+                        ", fileId=" + blocksMsg.fileId() + ", batchId=" + blocksMsg.id() + ']', e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param ackMsg Write acknowledgement message.
+     */
+    private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
+        try {
+            ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e);
+
+            return;
+        }
+
+        IgniteUuid fileId = ackMsg.fileId();
+
+        WriteCompletionFuture fut = pendingWrites.get(fileId);
+
+        if (fut != null) {
+            if (ackMsg.error() != null)
+                fut.onError(nodeId, ackMsg.error());
+            else
+                fut.onWriteAck(nodeId, ackMsg.id());
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Received write acknowledgement for non-existent write future (most likely future was " +
+                    "failed) [nodeId=" + nodeId + ", fileId=" + fileId + ']');
+        }
+    }
+
+    /**
+     * Creates block key based on block ID, file info and local affinity range.
+     *
+     * @param block Block ID.
+     * @param fileInfo File info being written.
+     * @param locRange Local affinity range to update.
+     * @return Block key.
+     */
+    private IgfsBlockKey createBlockKey(
+        long block,
+        IgfsFileInfo fileInfo,
+        IgfsFileAffinityRange locRange
+    ) {
+        // If affinityKey is present, return block key as is.
+        if (fileInfo.affinityKey() != null)
+            return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), block);
+
+        // If range is done, no colocated writes are attempted.
+        if (locRange == null || locRange.done())
+            return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), block);
+
+        long blockStart = block * fileInfo.blockSize();
+
+        // If block does not belong to new range, return old affinity key.
+        if (locRange.less(blockStart)) {
+            IgniteUuid affKey = fileInfo.fileMap().affinityKey(blockStart, false);
+
+            return new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(), block);
+        }
+
+        // Check if we have enough free space to do colocated writes.
+        if (dataCachePrj.ggfsDataSpaceUsed() > dataCachePrj.ggfsDataSpaceMax() *
+            igfsCtx.configuration().getFragmentizerLocalWritesRatio()) {
+            // Forbid further co-location.
+            locRange.markDone();
+
+            return new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(), block);
+        }
+
+        if (!locRange.belongs(blockStart))
+            locRange.expand(blockStart, fileInfo.blockSize());
+
+        return new IgfsBlockKey(fileInfo.id(), locRange.affinityKey(), fileInfo.evictExclude(), block);
+    }
+
+    /**
+     * Abstract class to handle writes from different type of input data.
+     */
+    private abstract class BlocksWriter<T> {
+        /**
+         * Stores data blocks read from abstracted source.
+         *
+         * @param fileInfo File info.
+         * @param reservedLen Reserved length.
+         * @param remainder Remainder.
+         * @param remainderLen Remainder length.
+         * @param src Source to read bytes.
+         * @param srcLen Data length to read from source.
+         * @param flush Flush flag.
+         * @param affinityRange Affinity range to update if file write can be colocated.
+         * @param batch Optional secondary file system worker batch.
+         * @throws IgniteCheckedException If failed.
+         * @return Data remainder if {@code flush} flag is {@code false}.
+         */
+        @Nullable public byte[] storeDataBlocks(
+            IgfsFileInfo fileInfo,
+            long reservedLen,
+            @Nullable byte[] remainder,
+            final int remainderLen,
+            T src,
+            int srcLen,
+            boolean flush,
+            IgfsFileAffinityRange affinityRange,
+            @Nullable IgfsFileWorkerBatch batch
+        ) throws IgniteCheckedException {
+            IgniteUuid id = fileInfo.id();
+            int blockSize = fileInfo.blockSize();
+
+            int len = remainderLen + srcLen;
+
+            if (len > reservedLen)
+                throw new IgfsException("Not enough space reserved to store data [id=" + id +
+                    ", reservedLen=" + reservedLen + ", remainderLen=" + remainderLen +
+                    ", data.length=" + srcLen + ']');
+
+            long start = reservedLen - len;
+            long first = start / blockSize;
+            long limit = (start + len + blockSize - 1) / blockSize;
+            int written = 0;
+            int remainderOff = 0;
+
+            Map<IgfsBlockKey, byte[]> nodeBlocks = U.newLinkedHashMap((int)(limit - first));
+            ClusterNode node = null;
+            int off = 0;
+
+            for (long block = first; block < limit; block++) {
+                final long blockStartOff = block == first ? (start % blockSize) : 0;
+                final long blockEndOff = block == (limit - 1) ? (start + len - 1) % blockSize : (blockSize - 1);
+
+                final long size = blockEndOff - blockStartOff + 1;
+
+                assert size > 0 && size <= blockSize;
+                assert blockStartOff + size <= blockSize;
+
+                final byte[] portion = new byte[(int)size];
+
+                // Data length to copy from remainder.
+                int portionOff = Math.min((int)size, remainderLen - remainderOff);
+
+                if (remainderOff != remainderLen) {
+                    U.arrayCopy(remainder, remainderOff, portion, 0, portionOff);
+
+                    remainderOff += portionOff;
+                }
+
+                if (portionOff < size)
+                    readData(src, portion, portionOff);
+
+                // Will update range if necessary.
+                IgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange);
+
+                ClusterNode primaryNode = dataCachePrj.cache().affinity().mapKeyToNode(key);
+
+                if (block == first) {
+                    off = (int)blockStartOff;
+                    node = primaryNode;
+                }
+
+                if (size == blockSize) {
+                    assert blockStartOff == 0 : "Cannot write the whole block not from start position [start=" +
+                        start + ", block=" + block + ", blockStartOff=" + blockStartOff + ", blockEndOff=" +
+                        blockEndOff + ", size=" + size + ", first=" + first + ", limit=" + limit + ", blockSize=" +
+                        blockSize + ']';
+                }
+                else {
+                    // If partial block is being written from the beginning and not flush, return it as remainder.
+                    if (blockStartOff == 0 && !flush) {
+                        assert written + portion.length == len;
+
+                        if (!nodeBlocks.isEmpty()) {
+                            processBatch(id, node, nodeBlocks);
+
+                            metrics.addWriteBlocks(1, 0);
+                        }
+
+                        return portion;
+                    }
+                }
+
+                int writtenSecondary = 0;
+
+                if (batch != null) {
+                    if (!batch.write(portion))
+                        throw new IgniteCheckedException("Cannot write more data to the secondary file system output " +
+                            "stream because it was marked as closed: " + batch.path());
+                    else
+                        writtenSecondary = 1;
+                }
+
+                assert primaryNode != null;
+
+                int writtenTotal = 0;
+
+                if (!primaryNode.id().equals(node.id())) {
+                    if (!nodeBlocks.isEmpty())
+                        processBatch(id, node, nodeBlocks);
+
+                    writtenTotal = nodeBlocks.size();
+
+                    nodeBlocks = U.newLinkedHashMap((int)(limit - first));
+                    node = primaryNode;
+                }
+
+                assert size == portion.length;
+
+                if (size != blockSize) {
+                    // Partial writes must be always synchronous.
+                    processPartialBlockWrite(id, key, block == first ? off : 0, portion);
+
+                    writtenTotal++;
+                }
+                else
+                    nodeBlocks.put(key, portion);
+
+                metrics.addWriteBlocks(writtenTotal, writtenSecondary);
+
+                written += portion.length;
+            }
+
+            // Process final batch, if exists.
+            if (!nodeBlocks.isEmpty()) {
+                processBatch(id, node, nodeBlocks);
+
+                metrics.addWriteBlocks(nodeBlocks.size(), 0);
+            }
+
+            assert written == len;
+
+            return null;
+        }
+
+        /**
+         * Fully reads data from specified source into the specified byte array.
+         *
+         * @param src Data source.
+         * @param dst Destination.
+         * @param dstOff Destination buffer offset.
+         * @throws IgniteCheckedException If read failed.
+         */
+        protected abstract void readData(T src, byte[] dst, int dstOff) throws IgniteCheckedException;
+    }
+
+    /**
+     * Byte buffer writer.
+     */
+    private class ByteBufferBlocksWriter extends BlocksWriter<ByteBuffer> {
+        /** {@inheritDoc} */
+        @Override protected void readData(ByteBuffer src, byte[] dst, int dstOff) {
+            src.get(dst, dstOff, dst.length - dstOff);
+        }
+    }
+
+    /**
+     * Data input writer.
+     */
+    private class DataInputBlocksWriter extends BlocksWriter<DataInput> {
+        /** {@inheritDoc} */
+        @Override protected void readData(DataInput src, byte[] dst, int dstOff)
+            throws IgniteCheckedException {
+            try {
+                src.readFully(dst, dstOff, dst.length - dstOff);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+    }
+
+    /**
+     * Helper closure to update data in cache.
+     */
+    @GridInternal
+    private static final class UpdateProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Start position in the block to write new data from. */
+        private int start;
+
+        /** Data block to write into cache. */
+        private byte[] data;
+
+        /**
+         * Empty constructor required for {@link Externalizable}.
+         *
+         */
+        public UpdateProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructs update data block closure.
+         *
+         * @param start Start position in the block to write new data from.
+         * @param data Data block to write into cache.
+         */
+        private UpdateProcessor(int start, byte[] data) {
+            assert start >= 0;
+            assert data != null;
+            assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']';
+
+            this.start = start;
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args) {
+            byte[] e = entry.getValue();
+
+            final int size = data.length;
+
+            if (e == null || e.length == 0)
+                e = new byte[start + size]; // Don't allocate more, then required.
+            else if (e.length < start + size) {
+                // Expand stored data array, if it less, then required.
+                byte[] tmp = new byte[start + size]; // Don't allocate more than required.
+
+                U.arrayCopy(e, 0, tmp, 0, e.length);
+
+                e = tmp;
+            }
+
+            // Copy data into entry.
+            U.arrayCopy(data, 0, e, start, size);
+
+            entry.setValue(e);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(start);
+            U.writeByteArray(out, data);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException {
+            start = in.readInt();
+            data = U.readByteArray(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length);
+        }
+    }
+
+    /**
+     * Asynchronous delete worker.
+     */
+    private class AsyncDeleteWorker extends GridWorker {
+        /** File info for stop request. */
+        private final IgfsFileInfo stopInfo = new IgfsFileInfo();
+
+        /** Delete requests queue. */
+        private BlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo>> delReqs =
+            new LinkedBlockingQueue<>();
+
+        /**
+         * @param gridName Grid name.
+         * @param name Worker name.
+         * @param log Log.
+         */
+        protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) {
+            super(gridName, name, log);
+        }
+
+        /**
+         * Gracefully stops worker by adding STOP_INFO to queue.
+         */
+        private void stop() {
+            delReqs.offer(F.t(new GridFutureAdapter<>(igfsCtx.kernalContext()), stopInfo));
+        }
+
+        /**
+         * @param info File info to delete.
+         * @return Future which completes when entry is actually removed.
+         */
+        private IgniteInternalFuture<Object> deleteAsync(IgfsFileInfo info) {
+            GridFutureAdapter<Object> fut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+
+            delReqs.offer(F.t(fut, info));
+
+            return fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            try {
+                while (!isCancelled()) {
+                    IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo> req = delReqs.take();
+
+                    GridFutureAdapter<Object> fut = req.get1();
+                    IgfsFileInfo fileInfo = req.get2();
+
+                    // Identity check.
+                    if (fileInfo == stopInfo) {
+                        fut.onDone();
+
+                        break;
+                    }
+
+                    IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader();
+
+                    try {
+                        IgfsFileMap map = fileInfo.fileMap();
+
+                        for (long block = 0, size = fileInfo.blocksCount(); block < size; block++) {
+                            IgniteUuid affKey = map == null ? null : map.affinityKey(block * fileInfo.blockSize(), true);
+
+                            ldr.removeData(new IgfsBlockKey(fileInfo.id(), affKey, fileInfo.evictExclude(),
+                                block));
+
+                            if (affKey != null)
+                                ldr.removeData(new IgfsBlockKey(fileInfo.id(), null, fileInfo.evictExclude(),
+                                    block));
+                        }
+                    }
+                    catch (IgniteInterruptedException ignored) {
+                        // Ignore interruption during shutdown.
+                    }
+                    catch (IgniteException e) {
+                        log.error("Failed to remove file contents: " + fileInfo, e);
+                    }
+                    finally {
+                        try {
+                            IgniteUuid fileId = fileInfo.id();
+
+                            for (long block = 0, size = fileInfo.blocksCount(); block < size; block++)
+                                ldr.removeData(new IgfsBlockKey(fileId, fileInfo.affinityKey(),
+                                    fileInfo.evictExclude(), block));
+                        }
+                        catch (IgniteException e) {
+                            log.error("Failed to remove file contents: " + fileInfo, e);
+                        }
+                        finally {
+                            try {
+                                ldr.close(isCancelled());
+                            }
+                            catch (IgniteException e) {
+                                log.error("Failed to stop data loader while shutting down ggfs async delete thread.", e);
+                            }
+                            finally {
+                                fut.onDone(); // Complete future.
+                            }
+                        }
+                    }
+                }
+            }
+            finally {
+                if (log.isDebugEnabled())
+                    log.debug("Stopping asynchronous ggfs file delete thread: " + name());
+
+                IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo> req = delReqs.poll();
+
+                while (req != null) {
+                    req.get1().onCancelled();
+
+                    req = delReqs.poll();
+                }
+            }
+        }
+    }
+
+    /**
+     * Future that is completed when all participating
+     */
+    private class WriteCompletionFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** File id to remove future from map. */
+        private IgniteUuid fileId;
+
+        /** Pending acks. */
+        private ConcurrentMap<UUID, Set<Long>> pendingAcks = new ConcurrentHashMap8<>();
+
+        /** Flag indicating future is waiting for last ack. */
+        private volatile boolean awaitingLast;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public WriteCompletionFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param fileId File id.
+         */
+        private WriteCompletionFuture(GridKernalContext ctx, IgniteUuid fileId) {
+            super(ctx);
+
+            assert fileId != null;
+
+            this.fileId = fileId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+            if (!isDone()) {
+                pendingWrites.remove(fileId, this);
+
+                if (super.onDone(res, err))
+                    return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * Write request will be asynchronously executed on node with given ID.
+         *
+         * @param nodeId Node ID.
+         * @param batchId Assigned batch ID.
+         */
+        private void onWriteRequest(UUID nodeId, long batchId) {
+            if (!isDone()) {
+                Set<Long> reqIds = pendingAcks.get(nodeId);
+
+                if (reqIds == null)
+                    reqIds = F.addIfAbsent(pendingAcks, nodeId, new GridConcurrentHashSet<Long>());
+
+                reqIds.add(batchId);
+            }
+        }
+
+        /**
+         * Error occurred on node with given ID.
+         *
+         * @param nodeId Node ID.
+         * @param e Caught exception.
+         */
+        private void onError(UUID nodeId, IgniteCheckedException e) {
+            Set<Long> reqIds = pendingAcks.get(nodeId);
+
+            // If waiting for ack from this node.
+            if (reqIds != null && !reqIds.isEmpty()) {
+                if (e.hasCause(IgfsOutOfSpaceException.class))
+                    onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e));
+                else
+                    onDone(new IgniteCheckedException(
+                        "Failed to wait for write completion (write failed on node): " + nodeId, e));
+            }
+        }
+
+        /**
+         * Write ack received from node with given ID for given batch ID.
+         *
+         * @param nodeId Node ID.
+         * @param batchId Batch ID.
+         */
+        private void onWriteAck(UUID nodeId, long batchId) {
+            if (!isDone()) {
+                Set<Long> reqIds = pendingAcks.get(nodeId);
+
+                assert reqIds != null : "Received acknowledgement message for not registered node [nodeId=" +
+                    nodeId + ", batchId=" + batchId + ']';
+
+                boolean rmv = reqIds.remove(batchId);
+
+                assert rmv : "Received acknowledgement message for not registered batch [nodeId=" +
+                    nodeId + ", batchId=" + batchId + ']';
+
+                if (awaitingLast && checkCompleted())
+                    onDone(true);
+            }
+        }
+
+        /**
+         * Marks this future as waiting last ack.
+         */
+        private void markWaitingLastAck() {
+            awaitingLast = true;
+
+            if (log.isDebugEnabled())
+                log.debug("Marked write completion future as awaiting last ack: " + fileId);
+
+            if (checkCompleted())
+                onDone(true);
+        }
+
+        /**
+         * @return True if received all request acknowledgements after {@link #markWaitingLastAck()} was called.
+         */
+        private boolean checkCompleted() {
+            for (Map.Entry<UUID, Set<Long>> entry : pendingAcks.entrySet()) {
+                Set<Long> reqIds = entry.getValue();
+
+                // If still waiting for some acks.
+                if (!reqIds.isEmpty())
+                    return false;
+            }
+
+            // Got match for each entry in sent map.
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
new file mode 100644
index 0000000..537fd25
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Indicates that entry scheduled for delete was actually deleted.
+ */
+public class IgfsDeleteMessage extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deleted entry ID. */
+    private IgniteUuid id;
+
+    /** Optional error. */
+    @GridDirectTransient
+    private IgniteCheckedException err;
+
+    /** */
+    private byte[] errBytes;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public IgfsDeleteMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param id Deleted entry ID.
+     */
+    public IgfsDeleteMessage(IgniteUuid id) {
+        assert id != null;
+
+        this.id = id;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param id Entry ID.
+     * @param err Error.
+     */
+    public IgfsDeleteMessage(IgniteUuid id, IgniteCheckedException err) {
+        assert err != null;
+
+        this.id = id;
+        this.err = err;
+    }
+
+    /**
+     * @return Deleted entry ID.
+     */
+    public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return Error.
+     */
+    public IgniteCheckedException error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
+        super.prepareMarshal(marsh);
+
+        if (err != null)
+            errBytes = marsh.marshal(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(marsh, ldr);
+
+        if (errBytes != null)
+            err = marsh.unmarshal(errBytes, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsDeleteMessage _clone = new IgfsDeleteMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsDeleteMessage _clone = (IgfsDeleteMessage)_msg;
+
+        _clone.id = id;
+        _clone.err = err;
+        _clone.errBytes = errBytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeIgniteUuid("id", id))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                id = reader.readIgniteUuid("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 67;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsDeleteMessage.class, this);
+    }
+}


Mime
View raw message