ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [29/52] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 13:14:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java
deleted file mode 100644
index a1170b6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.util.ipc.*;
-import org.apache.ignite.internal.util.ipc.loopback.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.thread.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.spi.IgnitePortProtocol.*;
-
-/**
- * GGFS server. Handles requests passed from GGFS clients.
- */
-public class IgfsServer {
-    /** GGFS context. */
-    private final IgfsContext ggfsCtx;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** GGFS marshaller. */
-    private final IgfsMarshaller marsh;
-
-    /** Endpoint configuration. */
-    private final Map<String,String> endpointCfg;
-
-    /** Server endpoint. */
-    private IpcServerEndpoint srvEndpoint;
-
-    /** Server message handler. */
-    private IgfsServerHandler hnd;
-
-    /** Accept worker. */
-    private AcceptWorker acceptWorker;
-
-    /** Started client workers. */
-    private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>();
-
-    /** Flag indicating if this a management endpoint. */
-    private final boolean mgmt;
-
-    /**
-     * Constructs ggfs server manager.
-     * @param ggfsCtx GGFS context.
-     * @param endpointCfg Endpoint configuration to start.
-     * @param mgmt Management flag - if true, server is intended to be started for Visor.
-     */
-    public IgfsServer(IgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) {
-        assert ggfsCtx != null;
-        assert endpointCfg != null;
-
-        this.endpointCfg = endpointCfg;
-        this.ggfsCtx = ggfsCtx;
-        this.mgmt = mgmt;
-
-        log = ggfsCtx.kernalContext().log(IgfsServer.class);
-
-        marsh = new IgfsMarshaller();
-    }
-
-    /**
-     * Starts this server.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void start() throws IgniteCheckedException {
-        srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg);
-
-        if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint)
-            throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() +
-                " should not be configured on Windows (configure " +
-                IpcServerTcpEndpoint.class.getSimpleName() + ")");
-
-        if (srvEndpoint instanceof IpcServerTcpEndpoint) {
-            IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint;
-
-            srvEndpoint0.setManagement(mgmt);
-
-            if (srvEndpoint0.getHost() == null) {
-                if (mgmt) {
-                    String locHostName = ggfsCtx.kernalContext().config().getLocalHost();
-
-                    try {
-                        srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress());
-                    }
-                    catch (IOException e) {
-                        throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e);
-                    }
-                }
-                else
-                    // Bind non-management endpoint to 127.0.0.1 by default.
-                    srvEndpoint0.setHost("127.0.0.1");
-            }
-        }
-
-        ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint);
-
-        srvEndpoint.start();
-
-        // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
-        if (srvEndpoint.getPort() >= 0)
-            ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
-
-        hnd = new IgfsIpcHandler(ggfsCtx);
-
-        // Start client accept worker.
-        acceptWorker = new AcceptWorker();
-    }
-
-    /**
-     * Callback that is invoked when kernal is ready.
-     */
-    public void onKernalStart() {
-        // Accept connections only when grid is ready.
-        if (srvEndpoint != null)
-            new IgniteThread(acceptWorker).start();
-    }
-
-    /**
-     * Stops this server.
-     *
-     * @param cancel Cancel flag.
-     */
-    public void stop(boolean cancel) {
-        // Skip if did not start.
-        if (srvEndpoint == null)
-            return;
-
-        // Stop accepting new client connections.
-        U.cancel(acceptWorker);
-
-        U.join(acceptWorker, log);
-
-        // Stop server handler, no more requests on existing connections will be processed.
-        try {
-            hnd.stop();
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e);
-        }
-
-        // Stop existing client connections.
-        for (ClientWorker worker : clientWorkers)
-            U.cancel(worker);
-
-        U.join(clientWorkers, log);
-
-        // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
-        if (srvEndpoint.getPort() >= 0)
-            ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
-
-        try {
-            ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to cleanup server endpoint.", e);
-        }
-    }
-
-    /**
-     * Gets IPC server endpoint.
-     *
-     * @return IPC server endpoint.
-     */
-    public IpcServerEndpoint getIpcServerEndpoint() {
-        return srvEndpoint;
-    }
-
-    /**
-     * Client reader thread.
-     */
-    private class ClientWorker extends GridWorker {
-        /** Connected client endpoint. */
-        private IpcEndpoint endpoint;
-
-        /** Data output stream. */
-        private final IgfsDataOutputStream out;
-
-        /** Client session object. */
-        private IgfsClientSession ses;
-
-        /** Queue node for fast unlink. */
-        private ConcurrentLinkedDeque8.Node<ClientWorker> node;
-
-        /**
-         * Creates client worker.
-         *
-         * @param idx Worker index for worker thread naming.
-         * @param endpoint Connected client endpoint.
-         * @throws IgniteCheckedException If endpoint output stream cannot be obtained.
-         */
-        protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException {
-            super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log);
-
-            this.endpoint = endpoint;
-
-            ses = new IgfsClientSession();
-
-            out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                IgfsDataInputStream dis = new IgfsDataInputStream(endpoint.inputStream());
-
-                byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
-
-                boolean first = true;
-
-                while (!Thread.currentThread().isInterrupted()) {
-                    dis.readFully(hdr);
-
-                    final long reqId = U.bytesToLong(hdr, 0);
-
-                    int ordinal = U.bytesToInt(hdr, 8);
-
-                    if (first) { // First message must be HANDSHAKE.
-                        if (reqId != 0 || ordinal != IgfsIpcCommand.HANDSHAKE.ordinal()) {
-                            U.warn(log, "Handshake failed.");
-
-                            return;
-                        }
-
-                        first = false;
-                    }
-
-                    final IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(ordinal);
-
-                    IgfsMessage msg = marsh.unmarshall(cmd, hdr, dis);
-
-                    IgniteInternalFuture<IgfsMessage> fut = hnd.handleAsync(ses, msg, dis);
-
-                    // If fut is null, no response is required.
-                    if (fut != null) {
-                        if (fut.isDone()) {
-                            IgfsMessage res;
-
-                            try {
-                                res = fut.get();
-                            }
-                            catch (IgniteCheckedException e) {
-                                res = new IgfsControlResponse();
-
-                                ((IgfsControlResponse)res).error(e);
-                            }
-
-                            try {
-                                synchronized (out) {
-                                    // Reuse header.
-                                    IgfsMarshaller.fillHeader(hdr, reqId, res.command());
-
-                                    marsh.marshall(res, hdr, out);
-
-                                    out.flush();
-                                }
-                            }
-                            catch (IOException | IgniteCheckedException e) {
-                                shutdown0(e);
-                            }
-                        }
-                        else {
-                            fut.listenAsync(new CIX1<IgniteInternalFuture<IgfsMessage>>() {
-                                @Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) {
-                                    IgfsMessage res;
-
-                                    try {
-                                        res = fut.get();
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        res = new IgfsControlResponse();
-
-                                        ((IgfsControlResponse)res).error(e);
-                                    }
-
-                                    try {
-                                        synchronized (out) {
-                                            byte[] hdr = IgfsMarshaller.createHeader(reqId, res.command());
-
-                                            marsh.marshall(res, hdr, out);
-
-                                            out.flush();
-                                        }
-                                    }
-                                    catch (IOException | IgniteCheckedException e) {
-                                        shutdown0(e);
-                                    }
-                                }
-                            });
-                        }
-                    }
-                }
-            }
-            catch (EOFException ignored) {
-                // Client closed connection.
-            }
-            catch (IgniteCheckedException | IOException e) {
-                if (!isCancelled())
-                    U.error(log, "Failed to read data from client (will close connection)", e);
-            }
-            finally {
-                onFinished();
-            }
-        }
-
-        /**
-         * @param node Node in queue for this worker.
-         */
-        public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) {
-            this.node = node;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            shutdown0(null);
-        }
-
-        /**
-         * @param e Optional exception occurred while stopping this
-         */
-        private void shutdown0(@Nullable Throwable e) {
-            if (!isCancelled()) {
-                if (e != null)
-                    U.error(log, "Stopping client reader due to exception: " + endpoint, e);
-            }
-
-            U.closeQuiet(out);
-
-            endpoint.close();
-        }
-
-        /**
-         * Final resource cleanup.
-         */
-        private void onFinished() {
-            // Second close is no-op, if closed manually.
-            U.closeQuiet(out);
-
-            endpoint.close();
-
-            // Finally, remove from queue.
-            if (clientWorkers.unlinkx(node))
-                hnd.onClosed(ses);
-        }
-    }
-
-    /**
-     * Accept worker.
-     */
-    private class AcceptWorker extends GridWorker {
-        /** Accept index. */
-        private int acceptCnt;
-
-        /**
-         * Creates accept worker.
-         */
-        protected AcceptWorker() {
-            super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                while (!Thread.currentThread().isInterrupted()) {
-                    IpcEndpoint client = srvEndpoint.accept();
-
-                    if (log.isDebugEnabled())
-                        log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() +
-                            ", client=" + client + ']');
-
-                    ClientWorker worker = new ClientWorker(client, acceptCnt++);
-
-                    IgniteThread workerThread = new IgniteThread(worker);
-
-                    ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker);
-
-                    worker.node(node);
-
-                    workerThread.start();
-                }
-            }
-            catch (IgniteCheckedException e) {
-                if (!isCancelled())
-                    U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e);
-            }
-            finally {
-                srvEndpoint.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            srvEndpoint.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java
deleted file mode 100644
index d7c7e11..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * GGFS server message handler. Server component that is plugged in to the server implementation
- * to handle incoming messages asynchronously.
- */
-public interface IgfsServerHandler {
-    /**
-     * Asynchronously handles incoming message.
-     *
-     * @param ses Client session.
-     * @param msg Message to process.
-     * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message.
-     * @return Future that will be completed when response is ready or {@code null} if no
-     *      response is required.
-     */
-    @Nullable public IgniteInternalFuture<IgfsMessage> handleAsync(IgfsClientSession ses,
-        IgfsMessage msg, DataInput in);
-
-    /**
-     * Handles handles client close events.
-     *
-     * @param ses Session that was closed.
-     */
-    public void onClosed(IgfsClientSession ses);
-
-    /**
-     * Stops handling of incoming requests. No server commands will be handled anymore.
-     *
-     * @throws IgniteCheckedException If error occurred.
-     */
-    public void stop() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java
deleted file mode 100644
index 07322e8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.ipc.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * GGFS server manager.
- */
-public class IgfsServerManager extends IgfsManager {
-    /** IPC server rebind interval. */
-    private static final long REBIND_INTERVAL = 3000;
-
-    /** Collection of servers to maintain. */
-    private Collection<IgfsServer> srvrs;
-
-    /** Server port binders. */
-    private BindWorker bindWorker;
-
-    /** Kernal start latch. */
-    private CountDownLatch kernalStartLatch = new CountDownLatch(1);
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        IgfsConfiguration ggfsCfg = igfsCtx.configuration();
-        Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration();
-
-        if (F.isEmpty(cfg)) {
-            // Set default configuration.
-            cfg = new HashMap<>();
-
-            cfg.put("type", U.isWindows() ? "tcp" : "shmem");
-            cfg.put("port", String.valueOf(DFLT_IPC_PORT));
-        }
-
-        if (ggfsCfg.isIpcEndpointEnabled())
-            bind(cfg, /*management*/false);
-
-        if (ggfsCfg.getManagementPort() >= 0) {
-            cfg = new HashMap<>();
-
-            cfg.put("type", "tcp");
-            cfg.put("port", String.valueOf(ggfsCfg.getManagementPort()));
-
-            bind(cfg, /*management*/true);
-        }
-
-        if (bindWorker != null)
-            new IgniteThread(bindWorker).start();
-    }
-
-    /**
-     * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread
-     * that will try to periodically start this endpoint.
-     *
-     * @param endpointCfg Endpoint configuration to start.
-     * @param mgmt {@code True} if endpoint is management.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException {
-        if (srvrs == null)
-            srvrs = new ConcurrentLinkedQueue<>();
-
-        IgfsServer ipcSrv = new IgfsServer(igfsCtx, endpointCfg, mgmt);
-
-        try {
-            ipcSrv.start();
-
-            srvrs.add(ipcSrv);
-        }
-        catch (IpcEndpointBindException ignored) {
-            int port = ipcSrv.getIpcServerEndpoint().getPort();
-
-            String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : "";
-
-            U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " +
-                "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." +
-                portMsg);
-
-            if (bindWorker == null)
-                bindWorker = new BindWorker();
-
-            bindWorker.addConfiguration(endpointCfg, mgmt);
-        }
-    }
-
-    /**
-     * @return Collection of active endpoints.
-     */
-    public Collection<IpcServerEndpoint> endpoints() {
-        return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() {
-            @Override public IpcServerEndpoint apply(IgfsServer e) {
-                return e.getIpcServerEndpoint();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        if (!F.isEmpty(srvrs)) {
-            for (IgfsServer srv : srvrs)
-                srv.onKernalStart();
-        }
-
-        kernalStartLatch.countDown();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        // Safety.
-        kernalStartLatch.countDown();
-
-        if (bindWorker != null) {
-            bindWorker.cancel();
-
-            U.join(bindWorker, log);
-        }
-
-        if (!F.isEmpty(srvrs)) {
-            for (IgfsServer srv : srvrs)
-                srv.stop(cancel);
-        }
-    }
-
-    /**
-     * Bind worker.
-     */
-    @SuppressWarnings("BusyWait")
-    private class BindWorker extends GridWorker {
-        /** Configurations to bind. */
-        private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>();
-
-        /**
-         * Constructor.
-         */
-        private BindWorker() {
-            super(igfsCtx.kernalContext().gridName(), "bind-worker", igfsCtx.kernalContext().log());
-        }
-
-        /**
-         * Adds configuration to bind on. Should not be called after thread start.
-         *
-         * @param cfg Configuration.
-         * @param mgmt Management flag.
-         */
-        public void addConfiguration(Map<String, String> cfg, boolean mgmt) {
-            bindCfgs.add(F.t(cfg, mgmt));
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            kernalStartLatch.await();
-
-            while (!isCancelled()) {
-                Thread.sleep(REBIND_INTERVAL);
-
-                Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator();
-
-                while (it.hasNext()) {
-                    IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next();
-
-                    IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2());
-
-                    try {
-                        ipcSrv.start();
-
-                        ipcSrv.onKernalStart();
-
-                        srvrs.add(ipcSrv);
-
-                        it.remove();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (GridWorker.log.isDebugEnabled())
-                            GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']');
-                    }
-                }
-
-                if (bindCfgs.isEmpty())
-                    break;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java
deleted file mode 100644
index 2300774..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import java.io.*;
-
-/**
- * GGFS response for status request.
- */
-public class IgfsStatus implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Total space size. */
-    private long spaceTotal;
-
-    /** Used space in GGFS. */
-    private long spaceUsed;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public IgfsStatus() {
-        // No-op.
-    }
-
-    /**
-     * @param spaceUsed Used space in GGFS.
-     * @param spaceTotal Total space available in GGFS.
-     */
-    public IgfsStatus(long spaceUsed, long spaceTotal) {
-        this.spaceUsed = spaceUsed;
-        this.spaceTotal = spaceTotal;
-    }
-
-    /**
-     * @return Total space available in GGFS.
-     */
-    public long spaceTotal() {
-        return spaceTotal;
-    }
-
-    /**
-     * @return Used space in GGFS.
-     */
-    public long spaceUsed() {
-        return spaceUsed;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(spaceUsed);
-        out.writeLong(spaceTotal);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        spaceUsed = in.readLong();
-        spaceTotal = in.readLong();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java
deleted file mode 100644
index 41f4145..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Basic sync message.
- */
-public class IgfsSyncMessage extends IgfsCommunicationMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Coordinator node order. */
-    private long order;
-
-    /** Response flag. */
-    private boolean res;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public IgfsSyncMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param order Node order.
-     * @param res Response flag.
-     */
-    public IgfsSyncMessage(long order, boolean res) {
-        this.order = order;
-        this.res = res;
-    }
-
-    /**
-     * @return Coordinator node order.
-     */
-    public long order() {
-        return order;
-    }
-
-    /**
-     * @return {@code True} if response message.
-     */
-    public boolean response() {
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsSyncMessage.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public MessageAdapter clone() {
-        IgfsSyncMessage _clone = new IgfsSyncMessage();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(MessageAdapter _msg) {
-        super.clone0(_msg);
-
-        IgfsSyncMessage _clone = (IgfsSyncMessage)_msg;
-
-        _clone.order = order;
-        _clone.res = res;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!typeWritten) {
-            if (!writer.writeByte(null, directType()))
-                return false;
-
-            typeWritten = true;
-        }
-
-        switch (state) {
-            case 0:
-                if (!writer.writeLong("order", order))
-                    return false;
-
-                state++;
-
-            case 1:
-                if (!writer.writeBoolean("res", res))
-                    return false;
-
-                state++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        reader.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (state) {
-            case 0:
-                order = reader.readLong("order");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                state++;
-
-            case 1:
-                res = reader.readBoolean("res");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                state++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 71;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java
deleted file mode 100644
index 7139c12..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.mapreduce.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * GGFS task arguments implementation.
- */
-public class IgfsTaskArgsImpl<T> implements IgfsTaskArgs<T>,  Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** GGFS name. */
-    private String ggfsName;
-
-    /** Paths. */
-    private Collection<IgfsPath> paths;
-
-    /** Record resolver. */
-    private IgfsRecordResolver recRslvr;
-
-    /** Skip non existent files flag. */
-    private boolean skipNonExistentFiles;
-
-    /** Maximum range length. */
-    private long maxRangeLen;
-
-    /** User argument. */
-    private T usrArg;
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public IgfsTaskArgsImpl() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param ggfsName GGFS name.
-     * @param paths Paths.
-     * @param recRslvr Record resolver.
-     * @param skipNonExistentFiles Skip non existent files flag.
-     * @param maxRangeLen Maximum range length.
-     * @param usrArg User argument.
-     */
-    public IgfsTaskArgsImpl(String ggfsName, Collection<IgfsPath> paths, IgfsRecordResolver recRslvr,
-        boolean skipNonExistentFiles, long maxRangeLen, T usrArg) {
-        this.ggfsName = ggfsName;
-        this.paths = paths;
-        this.recRslvr = recRslvr;
-        this.skipNonExistentFiles = skipNonExistentFiles;
-        this.maxRangeLen = maxRangeLen;
-        this.usrArg = usrArg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String ggfsName() {
-        return ggfsName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> paths() {
-        return paths;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsRecordResolver recordResolver() {
-        return recRslvr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipNonExistentFiles() {
-        return skipNonExistentFiles;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maxRangeLength() {
-        return maxRangeLen;
-    }
-
-    /** {@inheritDoc} */
-    @Override public T userArgument() {
-        return usrArg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsTaskArgsImpl.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, ggfsName);
-        U.writeCollection(out, paths);
-
-        out.writeObject(recRslvr);
-        out.writeBoolean(skipNonExistentFiles);
-        out.writeLong(maxRangeLen);
-        out.writeObject(usrArg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        ggfsName = U.readString(in);
-        paths = U.readCollection(in);
-
-        recRslvr = (IgfsRecordResolver)in.readObject();
-        skipNonExistentFiles = in.readBoolean();
-        maxRangeLen = in.readLong();
-        usrArg = (T)in.readObject();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java
deleted file mode 100644
index ba1fd6e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.internal.util.typedef.*;
-
-/**
- * GGFS ad-hoc thread.
- */
-public abstract class IgfsThread extends Thread {
-    /**
-     * Creates {@code GGFS} add-hoc thread.
-     */
-    protected IgfsThread() {
-        super("ggfs-worker");
-    }
-
-    /**
-     * Creates {@code GGFS} add-hoc thread.
-     *
-     * @param name Thread name.
-     */
-    protected IgfsThread(String name) {
-        super(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public final void run() {
-        try {
-            body();
-        }
-        catch (InterruptedException ignore) {
-            interrupt();
-        }
-        // Catch all.
-        catch (Throwable e) {
-            X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage());
-
-            e.printStackTrace();
-        }
-        finally {
-            try {
-                cleanup();
-            }
-            // Catch all.
-            catch (Throwable e) {
-                X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage());
-
-                e.printStackTrace();
-            }
-        }
-    }
-
-    /**
-     * Thread body.
-     *
-     * @throws InterruptedException If interrupted.
-     */
-    protected abstract void body() throws InterruptedException;
-
-    /**
-     * Cleanup.
-     */
-    protected void cleanup() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html
deleted file mode 100644
index 60e49f9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains high performance file system processer.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
new file mode 100644
index 0000000..e1051c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Block write request acknowledgement message.
+ */
+public class IgfsAckMessage extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** Request ID to ack. */
+    private long id;
+
+    /** Write exception. */
+    @GridDirectTransient
+    private IgniteCheckedException err;
+
+    /** */
+    private byte[] errBytes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsAckMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param fileId File ID.
+     * @param id Request ID.
+     * @param err Error.
+     */
+    public IgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) {
+        this.fileId = fileId;
+        this.id = id;
+        this.err = err;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Batch ID.
+     */
+    public long id() {
+        return id;
+    }
+
+    /**
+     * @return Error occurred when writing this batch, if any.
+     */
+    public IgniteCheckedException error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
+        super.prepareMarshal(marsh);
+
+        if (err != null)
+            errBytes = marsh.marshal(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(marsh, ldr);
+
+        if (errBytes != null)
+            err = marsh.unmarshal(errBytes, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsAckMessage _clone = new IgfsAckMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsAckMessage _clone = (IgfsAckMessage)_msg;
+
+        _clone.fileId = fileId;
+        _clone.id = id;
+        _clone.err = err;
+        _clone.errBytes = errBytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+            case 2:
+                if (!writer.writeLong("id", id))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 2:
+                id = reader.readLong("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 64;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
new file mode 100644
index 0000000..da37279
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+
+/**
+ * Ggfs supporting asynchronous operations.
+ */
+public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFs> implements IgfsEx {
+    /** */
+    private final IgfsImpl ggfs;
+
+    /**
+     * @param ggfs Ggfs.
+     */
+    public IgfsAsyncImpl(IgfsImpl ggfs) {
+        super(true);
+
+        this.ggfs = ggfs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void format() {
+        try {
+            saveOrGet(ggfs.formatAsync());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+        Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
+        @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls,
+        @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles,
+        long maxRangeLen, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        ggfs.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsContext context() {
+        return ggfs.context();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPaths proxyPaths() {
+        return ggfs.proxyPaths();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize,
+        int seqReadsBeforePrefetch) {
+        return ggfs.open(path, bufSize, seqReadsBeforePrefetch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path) {
+        return ggfs.open(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) {
+        return ggfs.open(path, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
+        return ggfs.globalSpace();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
+        ggfs.globalSampling(val);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Boolean globalSampling() {
+        return ggfs.globalSampling();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsLocalMetrics localMetrics() {
+        return ggfs.localMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long groupBlockSize() {
+        return ggfs.groupBlockSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
+        return ggfs.awaitDeletesAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String clientLogDirectory() {
+        return ggfs.clientLogDirectory();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clientLogDirectory(String logDir) {
+        ggfs.clientLogDirectory(logDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evictExclude(IgfsPath path, boolean primary) {
+        return ggfs.evictExclude(path, primary);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid nextAffinityKey() {
+        return ggfs.nextAffinityKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isProxy(URI path) {
+        return ggfs.isProxy(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String name() {
+        return ggfs.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsConfiguration configuration() {
+        return ggfs.configuration();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary summary(IgfsPath path) {
+        return ggfs.summary(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) {
+        return ggfs.create(path, overwrite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        return ggfs.create(path, bufSize, overwrite, replication, blockSize, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite,
+        @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) {
+        return ggfs.create(path, bufSize, overwrite, affKey, replication, blockSize, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream append(IgfsPath path, boolean create) {
+        return ggfs.append(path, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) {
+        return ggfs.append(path, bufSize, create, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) {
+        ggfs.setTimes(path, accessTime, modificationTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
+        return ggfs.affinity(path, start, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) {
+        return ggfs.affinity(path, start, len, maxLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsMetrics metrics() {
+        return ggfs.metrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        ggfs.resetMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size(IgfsPath path) {
+        return ggfs.size(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        return ggfs.exists(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
+        return ggfs.update(path, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) {
+        ggfs.rename(src, dest);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) {
+        return ggfs.delete(path, recursive);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) {
+        ggfs.mkdirs(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
+        ggfs.mkdirs(path, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+        return ggfs.listPaths(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+        return ggfs.listFiles(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile info(IgfsPath path) {
+        return ggfs.info(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        return ggfs.usedSpaceSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, String> properties() {
+        return ggfs.properties();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java
new file mode 100644
index 0000000..31b696e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * GGFS attributes.
+ * <p>
+ * This class contains information on a single GGFS configured on some node.
+ */
+public class IgfsAttributes implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** File's data block size (bytes). */
+    private int blockSize;
+
+    /** Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}. */
+    private int grpSize;
+
+    /** Meta cache name. */
+    private String metaCacheName;
+
+    /** Data cache name. */
+    private String dataCacheName;
+
+    /** Default mode. */
+    private IgfsMode dfltMode;
+
+    /** Fragmentizer enabled flag. */
+    private boolean fragmentizerEnabled;
+
+    /** Path modes. */
+    private Map<String, IgfsMode> pathModes;
+
+    /**
+     * @param ggfsName GGFS name.
+     * @param blockSize File's data block size (bytes).
+     * @param grpSize Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}.
+     * @param metaCacheName Meta cache name.
+     * @param dataCacheName Data cache name.
+     * @param dfltMode Default mode.
+     * @param pathModes Path modes.
+     */
+    public IgfsAttributes(String ggfsName, int blockSize, int grpSize, String metaCacheName, String dataCacheName,
+        IgfsMode dfltMode, Map<String, IgfsMode> pathModes, boolean fragmentizerEnabled) {
+        this.blockSize = blockSize;
+        this.ggfsName = ggfsName;
+        this.grpSize = grpSize;
+        this.metaCacheName = metaCacheName;
+        this.dataCacheName = dataCacheName;
+        this.dfltMode = dfltMode;
+        this.pathModes = pathModes;
+        this.fragmentizerEnabled = fragmentizerEnabled;
+    }
+
+    /**
+     * Public no-arg constructor for {@link Externalizable}.
+     */
+    public IgfsAttributes() {
+        // No-op.
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String ggfsName() {
+        return ggfsName;
+    }
+
+    /**
+     * @return File's data block size (bytes).
+     */
+    public int blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}.
+     */
+    public int groupSize() {
+        return grpSize;
+    }
+
+    /**
+     * @return Metadata cache name.
+     */
+    public String metaCacheName() {
+        return metaCacheName;
+    }
+
+    /**
+     * @return Data cache name.
+     */
+    public String dataCacheName() {
+        return dataCacheName;
+    }
+
+    /**
+     * @return Default mode.
+     */
+    public IgfsMode defaultMode() {
+        return dfltMode;
+    }
+
+    /**
+     * @return Path modes.
+     */
+    public Map<String, IgfsMode> pathModes() {
+        return pathModes != null ? Collections.unmodifiableMap(pathModes) : null;
+    }
+
+    /**
+     * @return {@code True} if fragmentizer is enabled.
+     */
+    public boolean fragmentizerEnabled() {
+        return fragmentizerEnabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ggfsName);
+        out.writeInt(blockSize);
+        out.writeInt(grpSize);
+        U.writeString(out, metaCacheName);
+        U.writeString(out, dataCacheName);
+        U.writeEnum(out, dfltMode);
+        out.writeBoolean(fragmentizerEnabled);
+
+        if (pathModes != null) {
+            out.writeBoolean(true);
+
+            out.writeInt(pathModes.size());
+
+            for (Map.Entry<String, IgfsMode> pathMode : pathModes.entrySet()) {
+                U.writeString(out, pathMode.getKey());
+                U.writeEnum(out, pathMode.getValue());
+            }
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ggfsName = U.readString(in);
+        blockSize = in.readInt();
+        grpSize = in.readInt();
+        metaCacheName = U.readString(in);
+        dataCacheName = U.readString(in);
+        dfltMode = IgfsMode.fromOrdinal(in.readByte());
+        fragmentizerEnabled = in.readBoolean();
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            pathModes = new HashMap<>(size, 1.0f);
+
+            for (int i = 0; i < size; i++)
+                pathModes.put(U.readString(in), IgfsMode.fromOrdinal(in.readByte()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
new file mode 100644
index 0000000..304095c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * File's binary data block key.
+ */
+@GridInternal
+public final class IgfsBlockKey extends MessageAdapter implements Externalizable, Comparable<IgfsBlockKey> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File system file ID. */
+    private IgniteUuid fileId;
+
+    /** Block ID. */
+    private long blockId;
+
+    /** Block affinity key. */
+    private IgniteUuid affKey;
+
+    /** Eviction exclude flag. */
+    private boolean evictExclude;
+
+    /**
+     * Constructs file's binary data block key.
+     *
+     * @param fileId File ID.
+     * @param affKey Affinity key.
+     * @param evictExclude Evict exclude flag.
+     * @param blockId Block ID.
+     */
+    public IgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) {
+        assert fileId != null;
+        assert blockId >= 0;
+
+        this.fileId = fileId;
+        this.affKey = affKey;
+        this.evictExclude = evictExclude;
+        this.blockId = blockId;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgfsBlockKey() {
+        // No-op.
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid getFileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Block affinity key.
+     */
+    public IgniteUuid affinityKey() {
+        return affKey;
+    }
+
+    /**
+     * @return Evict exclude flag.
+     */
+    public boolean evictExclude() {
+        return evictExclude;
+    }
+
+    /**
+     * @return Block ID.
+     */
+    public long getBlockId() {
+        return blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull IgfsBlockKey o) {
+        int res = fileId.compareTo(o.fileId);
+
+        if (res != 0)
+            return res;
+
+        long v1 = blockId;
+        long v2 = o.blockId;
+
+        if (v1 != v2)
+            return v1 > v2 ? 1 : -1;
+
+        if (affKey == null && o.affKey == null)
+            return 0;
+
+        if (affKey != null && o.affKey != null)
+            return affKey.compareTo(o.affKey);
+
+        return affKey != null ? -1 : 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, fileId);
+        U.writeGridUuid(out, affKey);
+        out.writeBoolean(evictExclude);
+        out.writeLong(blockId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        fileId = U.readGridUuid(in);
+        affKey = U.readGridUuid(in);
+        evictExclude = in.readBoolean();
+        blockId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || o.getClass() != getClass())
+            return false;
+
+        IgfsBlockKey that = (IgfsBlockKey)o;
+
+        return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) &&
+            evictExclude == that.evictExclude;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsBlockKey _clone = new IgfsBlockKey();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        IgfsBlockKey _clone = (IgfsBlockKey)_msg;
+
+        _clone.fileId = fileId;
+        _clone.blockId = blockId;
+        _clone.affKey = affKey;
+        _clone.evictExclude = evictExclude;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeIgniteUuid("affKey", affKey))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeLong("blockId", blockId))
+                    return false;
+
+                state++;
+
+            case 2:
+                if (!writer.writeBoolean("evictExclude", evictExclude))
+                    return false;
+
+                state++;
+
+            case 3:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        switch (state) {
+            case 0:
+                affKey = reader.readIgniteUuid("affKey");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                blockId = reader.readLong("blockId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 2:
+                evictExclude = reader.readBoolean("evictExclude");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 3:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 65;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsBlockKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
new file mode 100644
index 0000000..bbea11b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * File block location in the grid.
+ */
+public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long start;
+
+    /** */
+    private long len;
+
+    /** */
+    @GridToStringInclude
+    private Collection<UUID> nodeIds;
+
+    /** */
+    private Collection<String> names;
+
+    /** */
+    private Collection<String> hosts;
+
+    /**
+     * Empty constructor for externalizable.
+     */
+    public IgfsBlockLocationImpl() {
+        // No-op.
+    }
+
+    /**
+     * @param location HDFS block location.
+     * @param len New length.
+     */
+    public IgfsBlockLocationImpl(IgfsBlockLocation location, long len) {
+        assert location != null;
+
+        start = location.start();
+        this.len = len;
+
+        nodeIds = location.nodeIds();
+        names = location.names();
+        hosts = location.hosts();
+    }
+
+    /**
+     * @param start Start.
+     * @param len Length.
+     * @param nodes Affinity nodes.
+     */
+    public IgfsBlockLocationImpl(long start, long len, Collection<ClusterNode> nodes) {
+        assert start >= 0;
+        assert len > 0;
+        assert nodes != null && !nodes.isEmpty();
+
+        this.start = start;
+        this.len = len;
+
+        convertFromNodes(nodes);
+    }
+
+    /**
+     * @return Start position.
+     */
+    @Override public long start() {
+        return start;
+    }
+
+    /**
+     * @return Length.
+     */
+    @Override public long length() {
+        return len;
+    }
+
+    /**
+     * @return Node IDs.
+     */
+    @Override public Collection<UUID> nodeIds() {
+        return nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> names() {
+        return names;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hosts() {
+        return hosts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int)(start ^ (start >>> 32));
+
+        res = 31 * res + (int)(len ^ (len >>> 32));
+        res = 31 * res + nodeIds.hashCode();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        IgfsBlockLocationImpl that = (IgfsBlockLocationImpl)o;
+
+        return len == that.len && start == that.start && F.eq(nodeIds, that.nodeIds) && F.eq(names, that.names) &&
+            F.eq(hosts, that.hosts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsBlockLocationImpl.class, this);
+    }
+
+    /**
+     * Writes this object to data output. Note that this is not externalizable
+     * interface because we want to eliminate any marshaller.
+     *
+     * @param out Data output to write.
+     * @throws IOException If write failed.
+     */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert names != null;
+        assert hosts != null;
+
+        out.writeLong(start);
+        out.writeLong(len);
+
+        out.writeBoolean(nodeIds != null);
+
+        if (nodeIds != null) {
+            out.writeInt(nodeIds.size());
+
+            for (UUID nodeId : nodeIds)
+                U.writeUuid(out, nodeId);
+        }
+
+        out.writeInt(names.size());
+
+        for (String name : names)
+            out.writeUTF(name);
+
+        out.writeInt(hosts.size());
+
+        for (String host : hosts)
+            out.writeUTF(host);
+    }
+
+    /**
+     * Reads object from data input. Note we do not use externalizable interface
+     * to eliminate marshaller.
+     *
+     * @param in Data input.
+     * @throws IOException If read failed.
+     */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        start = in.readLong();
+        len = in.readLong();
+
+        int size;
+
+        if (in.readBoolean()) {
+            size = in.readInt();
+
+            nodeIds = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                nodeIds.add(U.readUuid(in));
+        }
+
+        size = in.readInt();
+
+        names = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            names.add(in.readUTF());
+
+        size = in.readInt();
+
+        hosts = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            hosts.add(in.readUTF());
+    }
+
+    /**
+     * Converts collection of rich nodes to block location data.
+     *
+     * @param nodes Collection of affinity nodes.
+     */
+    private void convertFromNodes(Collection<ClusterNode> nodes) {
+        Collection<String> names = new LinkedHashSet<>();
+        Collection<String> hosts = new LinkedHashSet<>();
+        Collection<UUID> nodeIds = new ArrayList<>(nodes.size());
+
+        for (final ClusterNode node : nodes) {
+            // Normalize host names into Hadoop-expected format.
+            try {
+                Collection<InetAddress> addrs = U.toInetAddresses(node);
+
+                for (InetAddress addr : addrs) {
+                    if (addr.getHostName() == null)
+                        names.add(addr.getHostAddress() + ":" + 9001);
+                    else {
+                        names.add(addr.getHostName() + ":" + 9001); // hostname:portNumber
+                        hosts.add(addr.getHostName());
+                    }
+                }
+            }
+            catch (IgniteCheckedException ignored) {
+                names.addAll(node.addresses());
+            }
+
+            nodeIds.add(node.id());
+        }
+
+        this.nodeIds = nodeIds;
+        this.names = names;
+        this.hosts = hosts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
new file mode 100644
index 0000000..90d244a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * GGFS write blocks message.
+ */
+public class IgfsBlocksMessage extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** Batch id */
+    private long id;
+
+    /** Blocks to store. */
+    @GridDirectMap(keyType = IgfsBlockKey.class, valueType = byte[].class)
+    private Map<IgfsBlockKey, byte[]> blocks;
+
+    /**
+     * Empty constructor required by {@link Externalizable}
+     */
+    public IgfsBlocksMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param fileId File ID.
+     * @param id Message id.
+     * @param blocks Blocks to put in cache.
+     */
+    public IgfsBlocksMessage(IgniteUuid fileId, long id, Map<IgfsBlockKey, byte[]> blocks) {
+        this.fileId = fileId;
+        this.id = id;
+        this.blocks = blocks;
+    }
+
+    /**
+     * @return File id.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Batch id.
+     */
+    public long id() {
+        return id;
+    }
+
+    /**
+     * @return Map of blocks to put in cache.
+     */
+    public Map<IgfsBlockKey, byte[]> blocks() {
+        return blocks;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsBlocksMessage _clone = new IgfsBlocksMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsBlocksMessage _clone = (IgfsBlocksMessage)_msg;
+
+        _clone.fileId = fileId;
+        _clone.id = id;
+        _clone.blocks = blocks;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeMap("blocks", blocks, IgfsBlockKey.class, byte[].class))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+            case 2:
+                if (!writer.writeLong("id", id))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                blocks = reader.readMap("blocks", IgfsBlockKey.class, byte[].class, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 2:
+                id = reader.readLong("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 66;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java
new file mode 100644
index 0000000..5da444d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * GGFS client session. Effectively used to manage lifecycle of opened resources and close them on
+ * connection close.
+ */
+public class IgfsClientSession {
+    /** Session resources. */
+    private ConcurrentMap<Long, Closeable> rsrcMap = new ConcurrentHashMap8<>();
+
+    /**
+     * Registers resource within this session.
+     *
+     * @param rsrcId Resource id.
+     * @param rsrc Resource to register.
+     */
+    public boolean registerResource(long rsrcId, Closeable rsrc) {
+        Object old = rsrcMap.putIfAbsent(rsrcId, rsrc);
+
+        return old == null;
+    }
+
+    /**
+     * Gets registered resource by ID.
+     *
+     * @param rsrcId Resource ID.
+     * @return Resource or {@code null} if resource was not found.
+     */
+    @Nullable public <T> T resource(Long rsrcId) {
+        return (T)rsrcMap.get(rsrcId);
+    }
+
+    /**
+     * Unregister previously registered resource.
+     *
+     * @param rsrcId Resource ID.
+     * @param rsrc Resource to unregister.
+     * @return {@code True} if resource was unregistered, {@code false} if no resource
+     *      is associated with this ID or other resource is associated with this ID.
+     */
+    public boolean unregisterResource(Long rsrcId, Closeable rsrc) {
+        return rsrcMap.remove(rsrcId, rsrc);
+    }
+
+    /**
+     * @return Registered resources iterator.
+     */
+    public Iterator<Closeable> registeredResources() {
+        return rsrcMap.values().iterator();
+    }
+}


Mime
View raw message