ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [19/62] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 16:13:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
new file mode 100644
index 0000000..02e44e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*;
+
+/**
+ * GGFS worker for removal from the trash directory.
+ */
+public class IgfsDeleteWorker extends IgfsThread {
+    /** Awake frequency, */
+    private static final long FREQUENCY = 1000;
+
+    /** How many files/folders to delete at once (i.e in a single transaction). */
+    private static final int MAX_DELETE_BATCH = 100;
+
+    /** GGFS context. */
+    private final IgfsContext ggfsCtx;
+
+    /** Metadata manager. */
+    private final IgfsMetaManager meta;
+
+    /** Data manager. */
+    private final IgfsDataManager data;
+
+    /** Event manager. */
+    private final GridEventStorageManager evts;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Lock. */
+    private final Lock lock = new ReentrantLock();
+
+    /** Condition. */
+    private final Condition cond = lock.newCondition();
+
+    /** Force worker to perform actual delete. */
+    private boolean force;
+
+    /** Cancellation flag. */
+    private volatile boolean cancelled;
+
+    /** Message topic. */
+    private Object topic;
+
+    /**
+     * Constructor.
+     *
+     * @param ggfsCtx GGFS context.
+     */
+    IgfsDeleteWorker(IgfsContext ggfsCtx) {
+        super("ggfs-delete-worker%" + ggfsCtx.ggfs().name() + "%" + ggfsCtx.kernalContext().localNodeId() + "%");
+
+        this.ggfsCtx = ggfsCtx;
+
+        meta = ggfsCtx.meta();
+        data = ggfsCtx.data();
+
+        evts = ggfsCtx.kernalContext().event();
+
+        String ggfsName = ggfsCtx.ggfs().name();
+
+        topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName);
+
+        assert meta != null;
+        assert data != null;
+
+        log = ggfsCtx.kernalContext().log(IgfsDeleteWorker.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException {
+        if (log.isDebugEnabled())
+            log.debug("Delete worker started.");
+
+        while (!cancelled) {
+            lock.lock();
+
+            try {
+                if (!cancelled && !force)
+                    cond.await(FREQUENCY, TimeUnit.MILLISECONDS);
+
+                force = false; // Reset force flag.
+            }
+            finally {
+                lock.unlock();
+            }
+
+            if (!cancelled)
+                delete();
+        }
+    }
+
+    /**
+     * Notify the worker that new entry to delete appeared.
+     */
+    void signal() {
+        lock.lock();
+
+        try {
+            force = true;
+
+            cond.signalAll();
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    void cancel() {
+        cancelled = true;
+
+        interrupt();
+    }
+
+    /**
+     * Perform cleanup of the trash directory.
+     */
+    private void delete() {
+        IgfsFileInfo info = null;
+
+        try {
+            info = meta.info(TRASH_ID);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Cannot obtain trash directory info.", e);
+        }
+
+        if (info != null) {
+            for (Map.Entry<String, IgfsListingEntry> entry : info.listing().entrySet()) {
+                IgniteUuid fileId = entry.getValue().fileId();
+
+                if (log.isDebugEnabled())
+                    log.debug("Deleting GGFS trash entry [name=" + entry.getKey() + ", fileId=" + fileId + ']');
+
+                try {
+                    if (!cancelled) {
+                        if (delete(entry.getKey(), fileId)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Sending delete confirmation message [name=" + entry.getKey() +
+                                    ", fileId=" + fileId + ']');
+
+                            sendDeleteMessage(new IgfsDeleteMessage(fileId));
+                        }
+                    }
+                    else
+                        break;
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // Ignore this exception while stopping.
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
+
+                    sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
+                }
+            }
+        }
+    }
+
+    /**
+     * Remove particular entry from the TRASH directory.
+     *
+     * @param name Entry name.
+     * @param id Entry ID.
+     * @return {@code True} in case the entry really was deleted form the file system by this call.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean delete(String name, IgniteUuid id) throws IgniteCheckedException {
+        assert name != null;
+        assert id != null;
+
+        while (true) {
+            IgfsFileInfo info = meta.info(id);
+
+            if (info != null) {
+                if (info.isDirectory()) {
+                    deleteDirectory(TRASH_ID, id);
+
+                    if (meta.delete(TRASH_ID, name, id))
+                        return true;
+                }
+                else {
+                    assert info.isFile();
+
+                    // Delete file content first.
+                    // In case this node crashes, other node will re-delete the file.
+                    data.delete(info).get();
+
+                    boolean ret = meta.delete(TRASH_ID, name, id);
+
+                    if (evts.isRecordable(EVT_GGFS_FILE_PURGED)) {
+                        if (info.path() != null)
+                            evts.record(new IgfsEvent(info.path(),
+                                ggfsCtx.kernalContext().discovery().localNode(), EVT_GGFS_FILE_PURGED));
+                        else
+                            LT.warn(log, null, "Removing file without path info: " + info);
+                    }
+
+                    return ret;
+                }
+            }
+            else
+                return false; // Entry was deleted concurrently.
+        }
+    }
+
+    /**
+     * Remove particular entry from the trash directory or subdirectory.
+     *
+     * @param parentId Parent ID.
+     * @param id Entry id.
+     * @throws IgniteCheckedException If delete failed for some reason.
+     */
+    private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException {
+        assert parentId != null;
+        assert id != null;
+
+        while (true) {
+            IgfsFileInfo info = meta.info(id);
+
+            if (info != null) {
+                assert info.isDirectory();
+
+                Map<String, IgfsListingEntry> listing = info.listing();
+
+                if (listing.isEmpty())
+                    return; // Directory is empty.
+
+                Map<String, IgfsListingEntry> delListing;
+
+                if (listing.size() <= MAX_DELETE_BATCH)
+                    delListing = listing;
+                else {
+                    delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f);
+
+                    int i = 0;
+
+                    for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
+                        delListing.put(entry.getKey(), entry.getValue());
+
+                        if (++i == MAX_DELETE_BATCH)
+                            break;
+                    }
+                }
+
+                GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(ggfsCtx.kernalContext());
+
+                // Delegate to child folders.
+                for (IgfsListingEntry entry : delListing.values()) {
+                    if (!cancelled) {
+                        if (entry.isDirectory())
+                            deleteDirectory(id, entry.fileId());
+                        else {
+                            IgfsFileInfo fileInfo = meta.info(entry.fileId());
+
+                            if (fileInfo != null) {
+                                assert fileInfo.isFile();
+
+                                fut.add(data.delete(fileInfo));
+                            }
+                        }
+                    }
+                    else
+                        return;
+                }
+
+                fut.markInitialized();
+
+                // Wait for data cache to delete values before clearing meta cache.
+                try {
+                    fut.get();
+                }
+                catch (IgniteFutureCancelledCheckedException ignore) {
+                    // This future can be cancelled only due to GGFS shutdown.
+                    cancelled = true;
+
+                    return;
+                }
+
+                // Actual delete of folder content.
+                Collection<IgniteUuid> delIds = meta.delete(id, delListing);
+
+                if (delListing == listing && delListing.size() == delIds.size())
+                    break; // All entries were deleted.
+            }
+            else
+                break; // Entry was deleted concurrently.
+        }
+    }
+
+    /**
+     * Send delete message to all meta cache nodes in the grid.
+     *
+     * @param msg Message to send.
+     */
+    private void sendDeleteMessage(IgfsDeleteMessage msg) {
+        assert msg != null;
+
+        Collection<ClusterNode> nodes = meta.metaCacheNodes();
+
+        for (ClusterNode node : nodes) {
+            try {
+                ggfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to send GGFS delete message to node [nodeId=" + node.id() +
+                    ", msg=" + msg + ", err=" + e.getMessage() + ']');
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryNotEmptyException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryNotEmptyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryNotEmptyException.java
new file mode 100644
index 0000000..6f63f43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryNotEmptyException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+
+/**
+ * Exception indicating that directory can not be deleted because it is not empty.
+ */
+public class IgfsDirectoryNotEmptyException extends IgfsException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Exception message.
+     */
+    public IgfsDirectoryNotEmptyException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates an instance of GGFS exception caused by nested exception.
+     *
+     * @param cause Exception cause.
+     */
+    public IgfsDirectoryNotEmptyException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
new file mode 100644
index 0000000..c190d7f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+
+/**
+ * Internal API extension for {@link org.apache.ignite.IgniteFs}.
+ */
+public interface IgfsEx extends IgniteFs {
+    /**
+     * Stops GGFS cleaning all used resources.
+     */
+    public void stop();
+
+    /**
+     * @return GGFS context.
+     */
+    public IgfsContext context();
+
+    /**
+     * Get handshake message.
+     *
+     * @return Handshake message.
+     */
+    public IgfsPaths proxyPaths();
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch)
+        throws IgniteException;
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException;
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException;
+
+    /**
+     * Gets global space counters.
+     *
+     * @return Tuple in which first component is used space on all nodes,
+     *      second is available space on all nodes.
+     * @throws IgniteCheckedException If task execution failed.
+     */
+    public IgfsStatus globalSpace() throws IgniteCheckedException;
+
+    /**
+     * Enables, disables or clears sampling flag.
+     *
+     * @param val {@code True} to turn on sampling, {@code false} to turn it off, {@code null} to clear sampling state.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException;
+
+    /**
+     * Get sampling state.
+     *
+     * @return {@code True} in case sampling is enabled, {@code false} otherwise, or {@code null} in case sampling
+     * flag is not set.
+     */
+    @Nullable public Boolean globalSampling();
+
+    /**
+     * Get local metrics.
+     *
+     * @return Local metrics.
+     */
+    public IgfsLocalMetrics localMetrics();
+
+    /**
+     * Gets group block size, i.e. block size multiplied by group size in affinity mapper.
+     *
+     * @return Group block size.
+     */
+    public long groupBlockSize();
+
+    /**
+     * Asynchronously await for all entries existing in trash to be removed.
+     *
+     * @return Future which will be completed when all entries existed in trash by the time of invocation are removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
+
+    /**
+     * Gets client file system log directory.
+     *
+     * @return Client file system log directory or {@code null} in case no client connections have been created yet.
+     */
+    @Nullable public String clientLogDirectory();
+
+    /**
+     * Sets client file system log directory.
+     *
+     * @param logDir Client file system log directory.
+     */
+    public void clientLogDirectory(String logDir);
+
+    /**
+     * Whether this path is excluded from evictions.
+     *
+     * @param path Path.
+     * @param primary Whether the mode is PRIMARY.
+     * @return {@code True} if path is excluded from evictions.
+     */
+    public boolean evictExclude(IgfsPath path, boolean primary);
+
+    /**
+     * Get next affinity key.
+     *
+     * @return Next affinity key.
+     */
+    public IgniteUuid nextAffinityKey();
+
+    /**
+     * Check whether the given path is proxy path.
+     *
+     * @param path Path.
+     * @return {@code True} if proxy.
+     */
+    public boolean isProxy(URI path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
new file mode 100644
index 0000000..c04345a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Affinity range.
+ */
+public class IgfsFileAffinityRange extends MessageAdapter implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Initial range status, right after creation. */
+    public static final int RANGE_STATUS_INITIAL = 0;
+
+    /** Moving range state. Fragmentizer started blocks copy. */
+    public static final int RANGE_STATUS_MOVING = 1;
+
+    /** Fragmentizer finished block copy for this range. */
+    public static final int RANGE_STATUS_MOVED = 2;
+
+    /** Range affinity key. */
+    private IgniteUuid affKey;
+
+    /** {@code True} if currently being moved by fragmentizer. */
+    @SuppressWarnings("RedundantFieldInitialization")
+    private int status = RANGE_STATUS_INITIAL;
+
+    /** Range start offset (divisible by block size). */
+    private long startOff;
+
+    /** Range end offset (endOff + 1 divisible by block size). */
+    private long endOff;
+
+    /** Transient flag indicating no further writes should be made to this range. */
+    private boolean done;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsFileAffinityRange() {
+        // No-op.
+    }
+
+    /**
+     * @param startOff Start offset.
+     * @param endOff End offset.
+     * @param affKey Affinity key.
+     */
+    IgfsFileAffinityRange(long startOff, long endOff, IgniteUuid affKey) {
+        this.startOff = startOff;
+        this.endOff = endOff;
+        this.affKey = affKey;
+    }
+
+    /**
+     * Creates new range with updated status.
+     *
+     * @param other Initial range.
+     * @param status Updated status.
+     */
+    IgfsFileAffinityRange(IgfsFileAffinityRange other, int status) {
+        startOff = other.startOff;
+        endOff = other.endOff;
+        affKey = other.affKey;
+
+        this.status = status;
+    }
+
+    /**
+     * @return Affinity key for this range.
+     */
+    public IgniteUuid affinityKey() {
+        return affKey;
+    }
+
+    /**
+     * @return Range start offset.
+     */
+    public long startOffset() {
+        return startOff;
+    }
+
+    /**
+     * @return Range end offset.
+     */
+    public long endOffset() {
+        return endOff;
+    }
+
+    /**
+     * @param blockStartOff Block start offset to check.
+     * @return {@code True} if block with given start offset belongs to this range.
+     */
+    public boolean belongs(long blockStartOff) {
+        return blockStartOff >= startOff && blockStartOff < endOff;
+    }
+
+    /**
+     * @param blockStartOff Block start offset to check.
+     * @return {@code True} if block with given start offset is located before this range.
+     */
+    public boolean less(long blockStartOff) {
+        return blockStartOff < startOff;
+    }
+
+    /**
+     * @param blockStartOff Block start offset to check.
+     * @return {@code True} if block with given start offset is located after this range.
+     */
+    public boolean greater(long blockStartOff) {
+        return blockStartOff > endOff;
+    }
+
+    /**
+     * @return If range is empty, i.e. has zero length.
+     */
+    public boolean empty() {
+        return startOff == endOff;
+    }
+
+    /**
+     * @return Range status.
+     */
+    public int status() {
+        return status;
+    }
+
+    /**
+     * Expands this range by given block.
+     *
+     * @param blockStartOff Offset of block start.
+     * @param expansionSize Block size.
+     */
+    public void expand(long blockStartOff, int expansionSize) {
+        // If we are expanding empty range.
+        if (endOff == startOff) {
+            assert endOff == blockStartOff : "Failed to expand range [endOff=" + endOff +
+                ", blockStartOff=" + blockStartOff + ", expansionSize=" + expansionSize + ']';
+
+            endOff += expansionSize - 1;
+        }
+        else {
+            assert endOff == blockStartOff - 1;
+
+            endOff += expansionSize;
+        }
+    }
+
+    /**
+     * Splits range into collection if smaller ranges with length equal to {@code maxSize}.
+     *
+     * @param maxSize Split part maximum size.
+     * @return Collection of range parts.
+     */
+    public Collection<IgfsFileAffinityRange> split(long maxSize) {
+        long len = endOff - startOff + 1;
+
+        if (len > maxSize) {
+            int size = (int)(len / maxSize + 1);
+
+            Collection<IgfsFileAffinityRange> res = new ArrayList<>(size);
+
+            long pos = startOff;
+
+            while (pos < endOff + 1) {
+                long end = Math.min(pos + maxSize - 1, endOff);
+
+                IgfsFileAffinityRange part = new IgfsFileAffinityRange(pos, end, affKey);
+
+                part.status = status;
+
+                res.add(part);
+
+                pos = end + 1;
+            }
+
+            return res;
+        }
+        else
+            return Collections.singletonList(this);
+    }
+
+    /**
+     * Tries to concatenate this range with a given one. If ranges are not adjacent, will return {@code null}.
+     *
+     * @param range Range to concatenate with.
+     * @return Concatenation result or {@code null} if ranges are not adjacent.
+     */
+    @Nullable public IgfsFileAffinityRange concat(IgfsFileAffinityRange range) {
+        if (endOff + 1 != range.startOff || !F.eq(affKey, range.affKey) || status != RANGE_STATUS_INITIAL)
+            return null;
+
+        return new IgfsFileAffinityRange(startOff, range.endOff, affKey);
+    }
+
+    /**
+     * Marks this range as done.
+     */
+    public void markDone() {
+        done = true;
+    }
+
+    /**
+     * @return Done flag.
+     */
+    public boolean done() {
+        return done;
+    }
+
+    /**
+     * Checks if range regions are equal.
+     *
+     * @param other Other range to check against.
+     * @return {@code True} if range regions are equal.
+     */
+    public boolean regionEqual(IgfsFileAffinityRange other) {
+        return startOff == other.startOff && endOff == other.endOff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, affKey);
+
+        out.writeInt(status);
+
+        out.writeLong(startOff);
+        out.writeLong(endOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        affKey = U.readGridUuid(in);
+
+        status = in.readInt();
+
+        startOff = in.readLong();
+        endOff = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsFileAffinityRange _clone = new IgfsFileAffinityRange();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        IgfsFileAffinityRange _clone = (IgfsFileAffinityRange)_msg;
+
+        _clone.affKey = affKey;
+        _clone.status = status;
+        _clone.startOff = startOff;
+        _clone.endOff = endOff;
+        _clone.done = done;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeIgniteUuid("affKey", affKey))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeBoolean("done", done))
+                    return false;
+
+                state++;
+
+            case 2:
+                if (!writer.writeLong("endOff", endOff))
+                    return false;
+
+                state++;
+
+            case 3:
+                if (!writer.writeLong("startOff", startOff))
+                    return false;
+
+                state++;
+
+            case 4:
+                if (!writer.writeInt("status", status))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        switch (state) {
+            case 0:
+                affKey = reader.readIgniteUuid("affKey");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                done = reader.readBoolean("done");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 2:
+                endOff = reader.readLong("endOff");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 3:
+                startOff = reader.readLong("startOff");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 4:
+                status = reader.readInt("status");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 68;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsFileAffinityRange.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java
new file mode 100644
index 0000000..dc1248c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * File or directory information.
+ */
+public final class IgfsFileImpl implements IgfsFile, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Path to this file. */
+    private IgfsPath path;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** Block size. */
+    private int blockSize;
+
+    /** Group block size. */
+    private long grpBlockSize;
+
+    /** File length. */
+    private long len;
+
+    /** Last access time. */
+    private long accessTime;
+
+    /** Last modification time. */
+    private long modificationTime;
+
+    /** Properties. */
+    private Map<String, String> props;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsFileImpl() {
+        // No-op.
+    }
+
+    /**
+     * Constructs directory info.
+     *
+     * @param path Path.
+     */
+    public IgfsFileImpl(IgfsPath path, IgfsFileInfo info, long globalGrpBlockSize) {
+        A.notNull(path, "path");
+        A.notNull(info, "info");
+
+        this.path = path;
+        fileId = info.id();
+
+        if (info.isFile()) {
+            blockSize = info.blockSize();
+            len = info.length();
+
+            grpBlockSize = info.affinityKey() == null ? globalGrpBlockSize :
+                info.length() == 0 ? globalGrpBlockSize : info.length();
+        }
+
+        props = info.properties();
+
+        if (props == null)
+            props = Collections.emptyMap();
+
+        accessTime = info.accessTime();
+        modificationTime = info.modificationTime();
+    }
+
+    /**
+     * Constructs file instance.
+     *
+     * @param path Path.
+     * @param entry Listing entry.
+     */
+    public IgfsFileImpl(IgfsPath path, IgfsListingEntry entry, long globalGrpSize) {
+        A.notNull(path, "path");
+        A.notNull(entry, "entry");
+
+        this.path = path;
+        fileId = entry.fileId();
+
+        blockSize = entry.blockSize();
+
+        grpBlockSize = entry.affinityKey() == null ? globalGrpSize :
+            entry.length() == 0 ? globalGrpSize : entry.length();
+
+        len = entry.length();
+        props = entry.properties();
+
+        accessTime = entry.accessTime();
+        modificationTime = entry.modificationTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPath path() {
+        return path;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isFile() {
+        return blockSize > 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDirectory() {
+        return blockSize == 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int blockSize() {
+        return blockSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long groupBlockSize() {
+        return grpBlockSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long accessTime() {
+        return accessTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long modificationTime() {
+        return modificationTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String property(String name) throws IllegalArgumentException {
+        String val = props.get(name);
+
+        if (val ==  null)
+            throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String property(String name, @Nullable String dfltVal) {
+        String val = props.get(name);
+
+        return val == null ? dfltVal : val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, String> properties() {
+        return props;
+    }
+
+    /**
+     * Writes object to data output.
+     *
+     * @param out Data output.
+     */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        path.writeExternal(out);
+
+        out.writeInt(blockSize);
+        out.writeLong(grpBlockSize);
+        out.writeLong(len);
+        U.writeStringMap(out, props);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
+    }
+
+    /**
+     * Reads object from data input.
+     *
+     * @param in Data input.
+     */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        path = new IgfsPath();
+
+        path.readExternal(in);
+
+        blockSize = in.readInt();
+        grpBlockSize = in.readLong();
+        len = in.readLong();
+        props = U.readStringMap(in);
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return path.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        IgfsFileImpl that = (IgfsFileImpl)o;
+
+        return path.equals(that.path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsFileImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java
new file mode 100644
index 0000000..43def03
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Unmodifiable file information.
+ */
+public final class IgfsFileInfo implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** ID for the root directory. */
+    public static final IgniteUuid ROOT_ID = new IgniteUuid(new UUID(0, 0), 0);
+
+    /** ID of the trash directory. */
+    public static final IgniteUuid TRASH_ID = new IgniteUuid(new UUID(0, 1), 0);
+
+    /** Info ID. */
+    private IgniteUuid id;
+
+    /** File length in bytes. */
+    private long len;
+
+    /** File block size, {@code zero} for directories. */
+    private int blockSize;
+
+    /** File properties. */
+    private Map<String, String> props;
+
+    /** File lock ID. */
+    private IgniteUuid lockId;
+
+    /** Affinity key used for single-node file collocation. */
+    private IgniteUuid affKey;
+
+    /** File affinity map. */
+    private IgfsFileMap fileMap;
+
+    /** Last access time. Modified on-demand. */
+    private long accessTime;
+
+    /** Last modification time. */
+    private long modificationTime;
+
+    /** Directory listing. */
+    @GridToStringInclude
+    private Map<String, IgfsListingEntry> listing;
+
+    /** Whether data blocks of this entry should never be excluded. */
+    private boolean evictExclude;
+
+    /**
+     * Original file path. This is a helper field used only in some
+     * operations like delete.
+     */
+    private IgfsPath path;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public IgfsFileInfo() {
+        this(ROOT_ID);
+    }
+
+    /**
+     * Constructs directory file info with the given ID.
+     *
+     * @param id ID.
+     */
+    IgfsFileInfo(IgniteUuid id) {
+        this(true, id, 0, 0, null, null, null, null, false, System.currentTimeMillis(), false);
+    }
+
+    /**
+     * Constructs directory or file info with {@link org.apache.ignite.configuration.IgfsConfiguration#DFLT_BLOCK_SIZE default} block size.
+     *
+     * @param isDir Constructs directory info if {@code true} or file info if {@code false}.
+     * @param props Meta properties to set.
+     */
+    public IgfsFileInfo(boolean isDir, @Nullable Map<String, String> props) {
+        this(isDir, null, isDir ? 0 : IgfsConfiguration.DFLT_BLOCK_SIZE, 0, null, null, props, null, false,
+            System.currentTimeMillis(), false);
+    }
+
+    /**
+     * Consturcts directory with random ID and provided listing.
+     *
+     * @param listing Listing.
+     */
+    IgfsFileInfo(Map<String, IgfsListingEntry> listing) {
+        this(true, null, 0, 0, null, listing, null, null, false, System.currentTimeMillis(), false);
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param blockSize Block size.
+     * @param affKey Affinity key.
+     * @param evictExclude Eviction exclude flag.
+     * @param props File properties.
+     */
+    IgfsFileInfo(int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
+        @Nullable Map<String, String> props) {
+        this(false, null, blockSize, 0, affKey, null, props, null, true, System.currentTimeMillis(), evictExclude);
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param blockSize Block size.
+     * @param len Length.
+     * @param affKey Affinity key.
+     * @param lockId Lock ID.
+     * @param props Properties.
+     * @param evictExclude Evict exclude flag.
+     */
+    public IgfsFileInfo(int blockSize, long len, @Nullable IgniteUuid affKey, @Nullable IgniteUuid lockId,
+        boolean evictExclude, @Nullable Map<String, String> props) {
+        this(false, null, blockSize, len, affKey, null, props, lockId, true, System.currentTimeMillis(), evictExclude);
+    }
+
+    /**
+     * Constructs file information.
+     *
+     * @param info File information to copy data from.
+     * @param len Size of a file.
+     */
+    IgfsFileInfo(IgfsFileInfo info, long len) {
+        this(info.isDirectory(), info.id, info.blockSize, len, info.affKey, info.listing, info.props, info.fileMap(),
+            info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude());
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param info File info.
+     * @param accessTime Last access time.
+     * @param modificationTime Last modification time.
+     */
+    IgfsFileInfo(IgfsFileInfo info, long accessTime, long modificationTime) {
+        this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props,
+            info.fileMap(), info.lockId, false, accessTime, modificationTime, info.evictExclude());
+    }
+
+    /**
+     * Constructs file information.
+     *
+     * @param info File information to copy data from.
+     * @param props File properties to set.
+     */
+    IgfsFileInfo(IgfsFileInfo info, @Nullable Map<String, String> props) {
+        this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, props,
+            info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude());
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param blockSize Block size,
+     * @param len Size of a file.
+     * @param props File properties to set.
+     * @param evictExclude Evict exclude flag.
+     */
+    IgfsFileInfo(int blockSize, long len, boolean evictExclude, @Nullable Map<String, String> props) {
+        this(false, null, blockSize, len, null, null, props, null, true, System.currentTimeMillis(), evictExclude);
+    }
+
+    /**
+     * Constructs file information.
+     *
+     * @param info File information to copy data from.
+     * @param lockId Lock ID.
+     * @param modificationTime Last modification time.
+     */
+    IgfsFileInfo(IgfsFileInfo info, @Nullable IgniteUuid lockId, long modificationTime) {
+        this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props,
+            info.fileMap(), lockId, true, info.accessTime, modificationTime, info.evictExclude());
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param listing New directory listing.
+     * @param old Old file info.
+     */
+    IgfsFileInfo(Map<String, IgfsListingEntry> listing, IgfsFileInfo old) {
+        this(old.isDirectory(), old.id, old.blockSize, old.len, old.affKey, listing, old.props, old.fileMap(),
+            old.lockId, false, old.accessTime, old.modificationTime, old.evictExclude());
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param isDir Constructs directory info if {@code true} or file info if {@code false}.
+     * @param id ID or {@code null} to generate it automatically.
+     * @param blockSize Block size.
+     * @param len Size of a file.
+     * @param affKey Affinity key for data blocks.
+     * @param listing Directory listing.
+     * @param props File properties.
+     * @param lockId Lock ID.
+     * @param cpProps Flag to copy properties map.
+     * @param modificationTime Last modification time.
+     * @param evictExclude Evict exclude flag.
+     */
+    private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey,
+        @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props,
+        @Nullable IgniteUuid lockId, boolean cpProps, long modificationTime, boolean evictExclude) {
+        this(isDir, id, blockSize, len, affKey, listing, props, null, lockId, cpProps, modificationTime,
+            modificationTime, evictExclude);
+    }
+
+    /**
+     * Constructs file info.
+     *
+     * @param isDir Constructs directory info if {@code true} or file info if {@code false}.
+     * @param id ID or {@code null} to generate it automatically.
+     * @param blockSize Block size.
+     * @param len Size of a file.
+     * @param affKey Affinity key for data blocks.
+     * @param listing Directory listing.
+     * @param props File properties.
+     * @param fileMap File map.
+     * @param lockId Lock ID.
+     * @param cpProps Flag to copy properties map.
+     * @param accessTime Last access time.
+     * @param modificationTime Last modification time.
+     * @param evictExclude Evict exclude flag.
+     */
+    private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey,
+        @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props,
+        @Nullable IgfsFileMap fileMap, @Nullable IgniteUuid lockId, boolean cpProps, long accessTime,
+        long modificationTime, boolean evictExclude) {
+        assert F.isEmpty(listing) || isDir;
+
+        if (isDir) {
+            assert len == 0 : "Directory length should be zero: " + len;
+            assert blockSize == 0 : "Directory block size should be zero: " + blockSize;
+        }
+        else {
+            assert len >= 0 : "File length cannot be negative: " + len;
+            assert blockSize > 0 : "File block size should be positive: " + blockSize;
+        }
+
+        this.id = id == null ? IgniteUuid.randomUuid() : id;
+        this.len = isDir ? 0 : len;
+        this.blockSize = isDir ? 0 : blockSize;
+        this.affKey = affKey;
+        this.listing = listing;
+
+        if (fileMap == null && !isDir)
+            fileMap = new IgfsFileMap();
+
+        this.fileMap = fileMap;
+        this.accessTime = accessTime;
+        this.modificationTime = modificationTime;
+
+        // Always make a copy of passed properties collection to escape concurrent modifications.
+        this.props = props == null || props.isEmpty() ? null :
+            cpProps ? new GridLeanMap<>(props) : props;
+
+        if (listing == null && isDir)
+            this.listing = Collections.emptyMap();
+
+        this.lockId = lockId;
+        this.evictExclude = evictExclude;
+    }
+
+    /**
+     * A copy constructor, which takes all data from the specified
+     * object field-by-field.
+     *
+     * @param info An object to copy data info.
+     */
+    public IgfsFileInfo(IgfsFileInfo info) {
+        this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props,
+            info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude());
+    }
+
+    /**
+     * Creates a builder for the new instance of file info.
+     *
+     * @return A builder to construct a new unmodifiable instance
+     *         of this class.
+     */
+    public static Builder builder() {
+        return new Builder(new IgfsFileInfo());
+    }
+
+    /**
+     * Creates a builder for the new instance of file info,
+     * based on the specified origin.
+     *
+     * @param origin An origin for new instance, from which
+     *               the data will be copied.
+     * @return A builder to construct a new unmodifiable instance
+     *         of this class.
+     */
+    public static Builder builder(IgfsFileInfo origin) {
+        return new Builder(new IgfsFileInfo(origin));
+    }
+
+    /**
+     * Gets this item ID.
+     *
+     * @return This item ID.
+     */
+    public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return {@code True} if this is a file.
+     */
+    public boolean isFile() {
+        return blockSize > 0;
+    }
+
+    /**
+     * @return {@code True} if this is a directory.
+     */
+    public boolean isDirectory() {
+        return blockSize == 0;
+    }
+
+    /**
+     * Get file size.
+     *
+     * @return File size.
+     */
+    public long length() {
+        assert isFile();
+
+        return len;
+    }
+
+    /**
+     * Get single data block size to store this file.
+     *
+     * @return Single data block size to store this file.
+     */
+    public int blockSize() {
+        assert isFile();
+
+        return blockSize;
+    }
+
+    /**
+     * @return Number of data blocks to store this file.
+     */
+    public long blocksCount() {
+        assert isFile();
+
+        return (len + blockSize() - 1) / blockSize();
+    }
+
+    /**
+     * @return Last access time.
+     */
+    public long accessTime() {
+        return accessTime;
+    }
+
+    /**
+     * @return Last modification time.
+     */
+    public long modificationTime() {
+        return modificationTime;
+    }
+
+    /**
+     * @return Directory listing.
+     */
+    public Map<String, IgfsListingEntry> listing() {
+        // Always wrap into unmodifiable map to be able to avoid illegal modifications in order pieces of the code.
+        if (isFile())
+            return Collections.unmodifiableMap(Collections.<String, IgfsListingEntry>emptyMap());
+
+        assert listing != null;
+
+        return Collections.unmodifiableMap(listing);
+    }
+
+    /**
+     * @return Affinity key used for single-node file collocation. If {@code null}, usual
+     *      mapper procedure is used for block affinity detection.
+     */
+    @Nullable public IgniteUuid affinityKey() {
+        return affKey;
+    }
+
+    /**
+     * @param affKey Affinity key used for single-node file collocation.
+     */
+    public void affinityKey(IgniteUuid affKey) {
+        this.affKey = affKey;
+    }
+
+    /**
+     * @return File affinity map.
+     */
+    public IgfsFileMap fileMap() {
+        return fileMap;
+    }
+
+    /**
+     * @param fileMap File affinity map.
+     */
+    public void fileMap(IgfsFileMap fileMap) {
+        this.fileMap = fileMap;
+    }
+
+    /**
+     * Get properties of the file.
+     *
+     * @return Properties of the file.
+     */
+    public Map<String, String> properties() {
+        return props == null || props.isEmpty() ? Collections.<String, String>emptyMap() :
+            Collections.unmodifiableMap(props);
+    }
+
+    /**
+     * Get lock ID.
+     *
+     * @return Lock ID if file is locked or {@code null} if file is free of locks.
+     */
+    @Nullable public IgniteUuid lockId() {
+        return lockId;
+    }
+
+    /**
+     * Get evict exclude flag.
+     *
+     * @return Evict exclude flag.
+     */
+    public boolean evictExclude() {
+        return evictExclude;
+    }
+
+    /**
+     * @return Original file path. This is a helper field used only in some operations like delete.
+     */
+    public IgfsPath path() {
+        return path;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, id);
+        out.writeInt(blockSize);
+        out.writeLong(len);
+        U.writeStringMap(out, props);
+        U.writeGridUuid(out, lockId);
+        U.writeGridUuid(out, affKey);
+        out.writeObject(listing);
+        out.writeObject(fileMap);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
+        out.writeBoolean(evictExclude);
+        out.writeObject(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        id = U.readGridUuid(in);
+        blockSize = in.readInt();
+        len = in.readLong();
+        props = U.readStringMap(in);
+        lockId = U.readGridUuid(in);
+        affKey = U.readGridUuid(in);
+        listing = (Map<String, IgfsListingEntry>)in.readObject();
+        fileMap = (IgfsFileMap)in.readObject();
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
+        evictExclude = in.readBoolean();
+        path = (IgfsPath)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id.hashCode() ^ blockSize ^ (int)(len ^ (len >>> 32)) ^ (props == null ? 0 : props.hashCode()) ^
+            (lockId == null ? 0 : lockId.hashCode());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj == this)
+            return true;
+
+        if (obj == null || getClass() != obj.getClass())
+            return false;
+
+        IgfsFileInfo that = (IgfsFileInfo)obj;
+
+        return id.equals(that.id) && blockSize == that.blockSize && len == that.len && F.eq(affKey, that.affKey) &&
+            F.eq(props, that.props) && F.eq(lockId, that.lockId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsFileInfo.class, this);
+    }
+
+    /**
+     * Builder for {@link IgfsFileInfo}.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class Builder {
+        /** Instance to build. */
+        private final IgfsFileInfo info;
+
+        /**
+         * Private constructor.
+         *
+         * @param info Instance to build.
+         */
+        private Builder(IgfsFileInfo info) {
+            this.info = info;
+        }
+
+        /**
+         * @param path A new path value.
+         * @return This builder instance (for chaining).
+         */
+        public Builder path(IgfsPath path) {
+            info.path = path;
+
+            return this;
+        }
+
+        /**
+         * Finishes instance construction and returns a resulting
+         * unmodifiable instance.
+         *
+         * @return A constructed instance.
+         */
+        public IgfsFileInfo build() {
+            return info;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
new file mode 100644
index 0000000..8f16ff2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.*;
+
+/**
+ * Auxiliary class that is responsible for managing file affinity keys allocation by ranges.
+ */
+public class IgfsFileMap implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    @GridToStringInclude
+    /** Sorted list of ranges in ascending order. */
+    private List<IgfsFileAffinityRange> ranges;
+
+    /**
+     * Empty constructor.
+     */
+    public IgfsFileMap() {
+        // No-op.
+    }
+
+    /**
+     * Constructs same file map as passed in.
+     *
+     * @param old Old map.
+     */
+    public IgfsFileMap(@Nullable IgfsFileMap old) {
+        if (old != null && old.ranges != null) {
+            ranges = new ArrayList<>(old.ranges.size());
+
+            ranges.addAll(old.ranges);
+        }
+    }
+
+    /**
+     * Gets affinity key from file map based on block start offset.
+     *
+     * @param blockOff Block start offset (divisible by block size).
+     * @param includeMoved If {@code true} then will return affinity key for ranges marked as moved.
+     *      Otherwise will return null for such ranges.
+     * @return Affinity key.
+     */
+    public IgniteUuid affinityKey(long blockOff, boolean includeMoved) {
+        if (ranges == null)
+            return null;
+
+        assert !ranges.isEmpty();
+
+        // Range binary search.
+        int leftIdx = 0, rightIdx = ranges.size() - 1;
+
+        IgfsFileAffinityRange leftRange = ranges.get(leftIdx);
+        IgfsFileAffinityRange rightRange = ranges.get(rightIdx);
+
+        // If block offset is less than start of first range, we don't have affinity key.
+        if (leftRange.less(blockOff))
+            return null;
+
+        if (leftRange.belongs(blockOff))
+            return leftRange.status() != RANGE_STATUS_MOVED ? leftRange.affinityKey() :
+                includeMoved ? leftRange.affinityKey() : null;
+
+        if (rightRange.greater(blockOff))
+            return null;
+
+        if (rightRange.belongs(blockOff))
+            return rightRange.status() != RANGE_STATUS_MOVED ? rightRange.affinityKey() :
+                includeMoved ? leftRange.affinityKey() : null;
+
+        while (rightIdx - leftIdx > 1) {
+            int midIdx = (leftIdx + rightIdx) / 2;
+
+            IgfsFileAffinityRange midRange = ranges.get(midIdx);
+
+            if (midRange.belongs(blockOff))
+                return midRange.status() != RANGE_STATUS_MOVED ? midRange.affinityKey() :
+                    includeMoved ? leftRange.affinityKey() : null;
+
+            // If offset is less then block start, update right index.
+            if (midRange.less(blockOff))
+                rightIdx = midIdx;
+            else {
+                assert midRange.greater(blockOff);
+
+                leftIdx = midIdx;
+            }
+        }
+
+        // Range was not found.
+        return null;
+    }
+
+    /**
+     * Updates range status in file map. Will split range into two ranges if given range is a sub-range starting
+     * from the same offset.
+     *
+     * @param range Range to update status.
+     * @param status New range status.
+     * @throws IgniteCheckedException If range was not found.
+     */
+    public void updateRangeStatus(IgfsFileAffinityRange range, int status) throws IgniteCheckedException {
+        if (ranges == null)
+            throw new IgfsInvalidRangeException("Failed to update range status (file map is empty) " +
+                "[range=" + range + ", ranges=" + ranges + ']');
+
+        assert !ranges.isEmpty();
+
+        // Check last.
+        int lastIdx = ranges.size() - 1;
+
+        IgfsFileAffinityRange last = ranges.get(lastIdx);
+
+        if (last.startOffset() == range.startOffset()) {
+            updateRangeStatus0(lastIdx, last, range, status);
+
+            return;
+        }
+
+        // Check first.
+        int firstIdx = 0;
+
+        IgfsFileAffinityRange first = ranges.get(firstIdx);
+
+        if (first.startOffset() == range.startOffset()) {
+            updateRangeStatus0(firstIdx, first, range, status);
+
+            return;
+        }
+
+        // Binary search.
+        while (lastIdx - firstIdx > 1) {
+            int midIdx = (firstIdx + lastIdx) / 2;
+
+            IgfsFileAffinityRange midRange = ranges.get(midIdx);
+
+            if (midRange.startOffset() == range.startOffset()) {
+                updateRangeStatus0(midIdx, midRange, range, status);
+
+                return;
+            }
+
+            // If range we are looking for is less
+            if (midRange.less(range.startOffset()))
+                lastIdx = midIdx;
+            else {
+                assert midRange.greater(range.startOffset());
+
+                firstIdx = midIdx;
+            }
+        }
+
+        throw new IgfsInvalidRangeException("Failed to update map for range (corresponding map range " +
+            "was not found) [range=" + range + ", status=" + status + ", ranges=" + ranges + ']');
+    }
+
+    /**
+     * Deletes range from map.
+     *
+     * @param range Range to delete.
+     */
+    public void deleteRange(IgfsFileAffinityRange range) throws IgniteCheckedException {
+        if (ranges == null)
+            throw new IgfsInvalidRangeException("Failed to remove range (file map is empty) " +
+                "[range=" + range + ", ranges=" + ranges + ']');
+
+        assert !ranges.isEmpty();
+
+        try {
+            // Check last.
+            int lastIdx = ranges.size() - 1;
+
+            IgfsFileAffinityRange last = ranges.get(lastIdx);
+
+            if (last.regionEqual(range)) {
+                assert last.status() == RANGE_STATUS_MOVED;
+
+                ranges.remove(last);
+
+                return;
+            }
+
+            // Check first.
+            int firstIdx = 0;
+
+            IgfsFileAffinityRange first = ranges.get(firstIdx);
+
+            if (first.regionEqual(range)) {
+                assert first.status() == RANGE_STATUS_MOVED;
+
+                ranges.remove(first);
+
+                return;
+            }
+
+            // Binary search.
+            while (lastIdx - firstIdx > 1) {
+                int midIdx = (firstIdx + lastIdx) / 2;
+
+                IgfsFileAffinityRange midRange = ranges.get(midIdx);
+
+                if (midRange.regionEqual(range)) {
+                    assert midRange.status() == RANGE_STATUS_MOVED;
+
+                    ranges.remove(midIdx);
+
+                    return;
+                }
+
+                // If range we are looking for is less
+                if (midRange.less(range.startOffset()))
+                    lastIdx = midIdx;
+                else {
+                    assert midRange.greater(range.startOffset());
+
+                    firstIdx = midIdx;
+                }
+            }
+        }
+        finally {
+            if (ranges.isEmpty())
+                ranges = null;
+        }
+
+        throw new IgfsInvalidRangeException("Failed to remove range from file map (corresponding map range " +
+            "was not found) [range=" + range + ", ranges=" + ranges + ']');
+    }
+
+    /**
+     * Updates range status at given position (will split range into two if necessary).
+     *
+     * @param origIdx Original range index.
+     * @param orig Original range at index.
+     * @param update Range being updated.
+     * @param status New status for range.
+     */
+    private void updateRangeStatus0(int origIdx, IgfsFileAffinityRange orig, IgfsFileAffinityRange update,
+        int status) {
+        assert F.eq(orig.affinityKey(), update.affinityKey());
+        assert ranges.get(origIdx) == orig;
+
+        if (orig.regionEqual(update))
+            ranges.set(origIdx, new IgfsFileAffinityRange(update, status));
+        else {
+            // If range was expanded, new one should be larger.
+            assert orig.endOffset() > update.endOffset();
+
+            ranges.set(origIdx, new IgfsFileAffinityRange(update, status));
+            ranges.add(origIdx + 1, new IgfsFileAffinityRange(update.endOffset() + 1, orig.endOffset(),
+                orig.affinityKey()));
+        }
+    }
+
+    /**
+     * Gets full list of ranges present in this map.
+     *
+     * @return Unmodifiable list of ranges.
+     */
+    public List<IgfsFileAffinityRange> ranges() {
+        if (ranges == null)
+            return Collections.emptyList();
+
+        return Collections.unmodifiableList(ranges);
+    }
+
+    /**
+     * Adds range to the list of already existing ranges. Added range must be located after
+     * the last range in this map. If added range is adjacent to the last range in the map,
+     * added range will be concatenated to the last one.
+     *
+     * @param range Range to add.
+     */
+    public void addRange(IgfsFileAffinityRange range) {
+        if (range == null || range.empty())
+            return;
+
+        // We cannot add range in the middle of the file.
+        if (ranges == null) {
+            ranges = new ArrayList<>();
+
+            ranges.add(range);
+
+            return;
+        }
+
+        assert !ranges.isEmpty();
+
+        IgfsFileAffinityRange last = ranges.get(ranges.size() - 1);
+
+        // Ensure that range being added is located to the right of last range in list.
+        assert last.greater(range.startOffset()) : "Cannot add range to middle of map [last=" + last +
+            ", range=" + range + ']';
+
+        // Try to concat last and new range.
+        IgfsFileAffinityRange concat = last.concat(range);
+
+        // Simply add range to the end of the list if they are not adjacent.
+        if (concat == null)
+            ranges.add(range);
+        else
+            ranges.set(ranges.size() - 1, concat);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        if (ranges == null)
+            out.writeInt(-1);
+        else {
+            assert !ranges.isEmpty();
+
+            out.writeInt(ranges.size());
+
+            for (IgfsFileAffinityRange range : ranges)
+                out.writeObject(range);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        int size = in.readInt();
+
+        if (size > 0) {
+            ranges = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                ranges.add((IgfsFileAffinityRange)in.readObject());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsFileMap.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
new file mode 100644
index 0000000..f1afe3d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * GGFS file worker for DUAL modes.
+ */
+public class IgfsFileWorker extends IgfsThread {
+    /** Time during which thread remains alive since it's last batch is finished. */
+    private static final long THREAD_REUSE_WAIT_TIME = 5000;
+
+    /** Lock */
+    private final Lock lock = new ReentrantLock();
+
+    /** Condition. */
+    private final Condition cond = lock.newCondition();
+
+    /** Next queued batch. */
+    private IgfsFileWorkerBatch nextBatch;
+
+    /** Batch which is currently being processed. */
+    private IgfsFileWorkerBatch curBatch;
+
+    /** Cancellation flag. */
+    private volatile boolean cancelled;
+
+    /**
+     * Creates {@code GGFS} file worker.
+     *
+     * @param name Worker name.
+     */
+    IgfsFileWorker(String name) {
+        super(name);
+    }
+
+    /**
+     * Add worker batch.
+     *
+     * @return {@code True} if the batch was actually added.
+     */
+    boolean addBatch(IgfsFileWorkerBatch batch) {
+        assert batch != null;
+
+        lock.lock();
+
+        try {
+            if (!cancelled) {
+                assert nextBatch == null; // Remember, that write operations on a single file are exclusive.
+
+                nextBatch = batch;
+
+                cond.signalAll();
+
+                return true;
+            }
+            else
+                return false;
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException {
+        while (!cancelled) {
+            lock.lock();
+
+            try {
+                // If there are no more new batches, wait for several seconds before shutting down the thread.
+                if (!cancelled && nextBatch == null)
+                    cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS);
+
+                curBatch = nextBatch;
+
+                nextBatch = null;
+
+                if (cancelled && curBatch != null)
+                    curBatch.finish(); // Mark the batch as finished if cancelled.
+            }
+            finally {
+                lock.unlock();
+            }
+
+            if (curBatch != null)
+                curBatch.process();
+            else {
+                lock.lock();
+
+                try {
+                    // No more new batches, we can safely release the worker as it was inactive for too long.
+                    if (nextBatch == null)
+                        cancelled = true;
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup() {
+        assert cancelled; // Cleanup can only be performed on a cancelled worker.
+
+        // Clear interrupted flag.
+        boolean interrupted = interrupted();
+
+        // Process the last batch if any.
+        if (nextBatch != null)
+            nextBatch.process();
+
+        onFinish();
+
+        // Reset interrupted flag.
+        if (interrupted)
+            interrupt();
+    }
+
+    /**
+     * Forcefully finish execution of all batches.
+     */
+    void cancel() {
+        lock.lock();
+
+        try {
+            cancelled = true;
+
+            if (curBatch != null)
+                curBatch.finish();
+
+            if (nextBatch != null)
+                nextBatch.finish();
+
+            cond.signalAll(); // Awake the main loop in case it is still waiting for the next batch.
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Get current batch.
+     *
+     * @return Current batch.
+     */
+    IgfsFileWorkerBatch currentBatch() {
+        lock.lock();
+
+        try {
+            return nextBatch == null ? curBatch : nextBatch;
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Callback invoked when worker has processed all it's batches.
+     */
+    protected void onFinish() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
new file mode 100644
index 0000000..27f9b7d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Work batch is an abstraction of the logically grouped tasks.
+ */
+public class IgfsFileWorkerBatch {
+    /** Completion latch. */
+    private final CountDownLatch completeLatch = new CountDownLatch(1);
+
+    /** Finish guard. */
+    private final AtomicBoolean finishGuard = new AtomicBoolean();
+
+    /** Lock for finish operation. */
+    private final ReadWriteLock finishLock = new ReentrantReadWriteLock();
+
+    /** Tasks queue. */
+    private final BlockingDeque<IgfsFileWorkerTask> queue = new LinkedBlockingDeque<>();
+
+    /** Path to the file in the primary file system. */
+    private final IgfsPath path;
+
+    /** Output stream to the file. */
+    private final OutputStream out;
+
+    /** Caught exception. */
+    private volatile IgniteCheckedException err;
+
+    /** Last task marker. */
+    private boolean lastTask;
+
+    /**
+     * Constructor.
+     *
+     * @param path Path to the file in the primary file system.
+     * @param out Output stream opened to that file.
+     */
+    IgfsFileWorkerBatch(IgfsPath path, OutputStream out) {
+        assert path != null;
+        assert out != null;
+
+        this.path = path;
+        this.out = out;
+    }
+
+    /**
+     * Perform write.
+     *
+     * @param data Data to be written.
+     * @return {@code True} in case operation was enqueued.
+     */
+    boolean write(final byte[] data) {
+        return addTask(new IgfsFileWorkerTask() {
+            @Override public void execute() throws IgniteCheckedException {
+                try {
+                    out.write(data);
+                }
+                catch (IOException e) {
+                    throw new IgniteCheckedException("Failed to write data to the file due to secondary file system " +
+                        "exception: " + path, e);
+                }
+            }
+        });
+    }
+
+    /**
+     * Process the batch.
+     */
+    void process() {
+        try {
+            boolean cancelled = false;
+
+            while (!cancelled) {
+                try {
+                    IgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS);
+
+                    if (task == null)
+                        continue;
+
+                    task.execute();
+
+                    if (lastTask)
+                        cancelled = true;
+                }
+                catch (IgniteCheckedException e) {
+                    err = e;
+
+                    cancelled = true;
+                }
+                catch (InterruptedException ignore) {
+                    Thread.currentThread().interrupt();
+
+                    cancelled = true;
+                }
+            }
+        }
+        finally {
+            try {
+                onComplete();
+            }
+            finally {
+                U.closeQuiet(out);
+
+                completeLatch.countDown();
+            }
+        }
+    }
+
+    /**
+     * Add the last task to that batch which will release all the resources.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    void finish() {
+        if (finishGuard.compareAndSet(false, true)) {
+            finishLock.writeLock().lock();
+
+            try {
+                queue.add(new IgfsFileWorkerTask() {
+                    @Override public void execute() {
+                        assert queue.isEmpty();
+
+                        lastTask = true;
+                    }
+                });
+            }
+            finally {
+                finishLock.writeLock().unlock();
+            }
+        }
+    }
+
+    /**
+     * Await for that worker batch to complete.
+     *
+     * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing.
+     */
+    void await() throws IgniteCheckedException {
+        try {
+            completeLatch.await();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+
+        IgniteCheckedException err0 = err;
+
+        if (err0 != null)
+            throw err0;
+    }
+
+    /**
+     * Await for that worker batch to complete in case it was marked as finished.
+     *
+     * @throws IgniteCheckedException In case any exception has occurred during batch tasks processing.
+     */
+    void awaitIfFinished() throws IgniteCheckedException {
+        if (finishGuard.get())
+            await();
+    }
+
+    /**
+     * Get primary file system path.
+     *
+     * @return Primary file system path.
+     */
+    IgfsPath path() {
+        return path;
+    }
+
+    /**
+     * Callback invoked when all the tasks within the batch are completed.
+     */
+    protected void onComplete() {
+        // No-op.
+    }
+
+    /**
+     * Add task to the queue.
+     *
+     * @param task Task to add.
+     * @return {@code True} in case the task was added to the queue.
+     */
+    private boolean addTask(IgfsFileWorkerTask task) {
+        finishLock.readLock().lock();
+
+        try {
+            if (!finishGuard.get()) {
+                try {
+                    queue.put(task);
+
+                    return true;
+                }
+                catch (InterruptedException ignore) {
+                    // Task was not enqueued due to interruption.
+                    Thread.currentThread().interrupt();
+
+                    return false;
+                }
+            }
+            else
+                return false;
+
+        }
+        finally {
+            finishLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
new file mode 100644
index 0000000..d2b4368
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+
+/**
+ * Generic GGFS worker task which could potentially throw an exception.
+ */
+public interface IgfsFileWorkerTask {
+    /**
+     * Execute task logic.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void execute() throws IgniteCheckedException;
+}


Mime
View raw message