ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [09/57] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 10:54:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java
deleted file mode 100644
index 470f56d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.fs.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * GGFS server message handler. Server component that is plugged in to the server implementation
- * to handle incoming messages asynchronously.
- */
-public interface GridGgfsServerHandler {
-    /**
-     * Asynchronously handles incoming message.
-     *
-     * @param ses Client session.
-     * @param msg Message to process.
-     * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message.
-     * @return Future that will be completed when response is ready or {@code null} if no
-     *      response is required.
-     */
-    @Nullable public IgniteInternalFuture<GridGgfsMessage> handleAsync(GridGgfsClientSession ses,
-        GridGgfsMessage msg, DataInput in);
-
-    /**
-     * Handles handles client close events.
-     *
-     * @param ses Session that was closed.
-     */
-    public void onClosed(GridGgfsClientSession ses);
-
-    /**
-     * Stops handling of incoming requests. No server commands will be handled anymore.
-     *
-     * @throws IgniteCheckedException If error occurred.
-     */
-    public void stop() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java
deleted file mode 100644
index c31bce6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsServerManager.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.ipc.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.configuration.IgniteFsConfiguration.*;
-
-/**
- * GGFS server manager.
- */
-public class GridGgfsServerManager extends GridGgfsManager {
-    /** IPC server rebind interval. */
-    private static final long REBIND_INTERVAL = 3000;
-
-    /** Collection of servers to maintain. */
-    private Collection<IgfsServer> srvrs;
-
-    /** Server port binders. */
-    private BindWorker bindWorker;
-
-    /** Kernal start latch. */
-    private CountDownLatch kernalStartLatch = new CountDownLatch(1);
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        IgniteFsConfiguration ggfsCfg = ggfsCtx.configuration();
-        Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration();
-
-        if (F.isEmpty(cfg)) {
-            // Set default configuration.
-            cfg = new HashMap<>();
-
-            cfg.put("type", U.isWindows() ? "tcp" : "shmem");
-            cfg.put("port", String.valueOf(DFLT_IPC_PORT));
-        }
-
-        if (ggfsCfg.isIpcEndpointEnabled())
-            bind(cfg, /*management*/false);
-
-        if (ggfsCfg.getManagementPort() >= 0) {
-            cfg = new HashMap<>();
-
-            cfg.put("type", "tcp");
-            cfg.put("port", String.valueOf(ggfsCfg.getManagementPort()));
-
-            bind(cfg, /*management*/true);
-        }
-
-        if (bindWorker != null)
-            new IgniteThread(bindWorker).start();
-    }
-
-    /**
-     * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread
-     * that will try to periodically start this endpoint.
-     *
-     * @param endpointCfg Endpoint configuration to start.
-     * @param mgmt {@code True} if endpoint is management.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException {
-        if (srvrs == null)
-            srvrs = new ConcurrentLinkedQueue<>();
-
-        IgfsServer ipcSrv = new IgfsServer(ggfsCtx, endpointCfg, mgmt);
-
-        try {
-            ipcSrv.start();
-
-            srvrs.add(ipcSrv);
-        }
-        catch (IpcEndpointBindException ignored) {
-            int port = ipcSrv.getIpcServerEndpoint().getPort();
-
-            String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : "";
-
-            U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " +
-                "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." +
-                portMsg);
-
-            if (bindWorker == null)
-                bindWorker = new BindWorker();
-
-            bindWorker.addConfiguration(endpointCfg, mgmt);
-        }
-    }
-
-    /**
-     * @return Collection of active endpoints.
-     */
-    public Collection<IpcServerEndpoint> endpoints() {
-        return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() {
-            @Override public IpcServerEndpoint apply(IgfsServer e) {
-                return e.getIpcServerEndpoint();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        if (!F.isEmpty(srvrs)) {
-            for (IgfsServer srv : srvrs)
-                srv.onKernalStart();
-        }
-
-        kernalStartLatch.countDown();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        // Safety.
-        kernalStartLatch.countDown();
-
-        if (bindWorker != null) {
-            bindWorker.cancel();
-
-            U.join(bindWorker, log);
-        }
-
-        if (!F.isEmpty(srvrs)) {
-            for (IgfsServer srv : srvrs)
-                srv.stop(cancel);
-        }
-    }
-
-    /**
-     * Bind worker.
-     */
-    @SuppressWarnings("BusyWait")
-    private class BindWorker extends GridWorker {
-        /** Configurations to bind. */
-        private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>();
-
-        /**
-         * Constructor.
-         */
-        private BindWorker() {
-            super(ggfsCtx.kernalContext().gridName(), "bind-worker", ggfsCtx.kernalContext().log());
-        }
-
-        /**
-         * Adds configuration to bind on. Should not be called after thread start.
-         *
-         * @param cfg Configuration.
-         * @param mgmt Management flag.
-         */
-        public void addConfiguration(Map<String, String> cfg, boolean mgmt) {
-            bindCfgs.add(F.t(cfg, mgmt));
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            kernalStartLatch.await();
-
-            while (!isCancelled()) {
-                Thread.sleep(REBIND_INTERVAL);
-
-                Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator();
-
-                while (it.hasNext()) {
-                    IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next();
-
-                    IgfsServer ipcSrv = new IgfsServer(ggfsCtx, cfg.get1(), cfg.get2());
-
-                    try {
-                        ipcSrv.start();
-
-                        ipcSrv.onKernalStart();
-
-                        srvrs.add(ipcSrv);
-
-                        it.remove();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (GridWorker.log.isDebugEnabled())
-                            GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']');
-                    }
-                }
-
-                if (bindCfgs.isEmpty())
-                    break;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAsyncImpl.java
new file mode 100644
index 0000000..c030625
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAsyncImpl.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.ignitefs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+
+/**
+ * Ggfs supporting asynchronous operations.
+ */
+public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFs> implements IgfsEx {
+    /** */
+    private final IgfsImpl ggfs;
+
+    /**
+     * @param ggfs Ggfs.
+     */
+    public IgfsAsyncImpl(IgfsImpl ggfs) {
+        super(true);
+
+        this.ggfs = ggfs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void format() {
+        try {
+            saveOrGet(ggfs.formatAsync());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
+        Collection<IgniteFsPath> paths, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr,
+        Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+        @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls,
+        @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles,
+        long maxRangeLen, @Nullable T arg) {
+        try {
+            return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        ggfs.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsContext context() {
+        return ggfs.context();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPaths proxyPaths() {
+        return ggfs.proxyPaths();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgniteFsPath path, int bufSize,
+        int seqReadsBeforePrefetch) {
+        return ggfs.open(path, bufSize, seqReadsBeforePrefetch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgniteFsPath path) {
+        return ggfs.open(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsInputStreamAdapter open(IgniteFsPath path, int bufSize) {
+        return ggfs.open(path, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
+        return ggfs.globalSpace();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
+        ggfs.globalSampling(val);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Boolean globalSampling() {
+        return ggfs.globalSampling();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsLocalMetrics localMetrics() {
+        return ggfs.localMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long groupBlockSize() {
+        return ggfs.groupBlockSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
+        return ggfs.awaitDeletesAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String clientLogDirectory() {
+        return ggfs.clientLogDirectory();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clientLogDirectory(String logDir) {
+        ggfs.clientLogDirectory(logDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evictExclude(IgniteFsPath path, boolean primary) {
+        return ggfs.evictExclude(path, primary);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid nextAffinityKey() {
+        return ggfs.nextAffinityKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isProxy(URI path) {
+        return ggfs.isProxy(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String name() {
+        return ggfs.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsConfiguration configuration() {
+        return ggfs.configuration();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsPathSummary summary(IgniteFsPath path) {
+        return ggfs.summary(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsOutputStream create(IgniteFsPath path, boolean overwrite) {
+        return ggfs.create(path, overwrite);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        return ggfs.create(path, bufSize, overwrite, replication, blockSize, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsOutputStream create(IgniteFsPath path, int bufSize, boolean overwrite,
+        @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) {
+        return ggfs.create(path, bufSize, overwrite, affKey, replication, blockSize, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsOutputStream append(IgniteFsPath path, boolean create) {
+        return ggfs.append(path, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsOutputStream append(IgniteFsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) {
+        return ggfs.append(path, bufSize, create, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(IgniteFsPath path, long accessTime, long modificationTime) {
+        ggfs.setTimes(path, accessTime, modificationTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len) {
+        return ggfs.affinity(path, start, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFsBlockLocation> affinity(IgniteFsPath path, long start, long len, long maxLen) {
+        return ggfs.affinity(path, start, len, maxLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsMetrics metrics() {
+        return ggfs.metrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        ggfs.resetMetrics();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size(IgniteFsPath path) {
+        return ggfs.size(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgniteFsPath path) {
+        return ggfs.exists(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) {
+        return ggfs.update(path, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgniteFsPath src, IgniteFsPath dest) {
+        ggfs.rename(src, dest);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgniteFsPath path, boolean recursive) {
+        return ggfs.delete(path, recursive);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgniteFsPath path) {
+        ggfs.mkdirs(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgniteFsPath path, @Nullable Map<String, String> props) {
+        ggfs.mkdirs(path, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFsPath> listPaths(IgniteFsPath path) {
+        return ggfs.listPaths(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFsFile> listFiles(IgniteFsPath path) {
+        return ggfs.listFiles(path);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteFsFile info(IgniteFsPath path) {
+        return ggfs.info(path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        return ggfs.usedSpaceSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, String> properties() {
+        return ggfs.properties();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAttributes.java
new file mode 100644
index 0000000..1e549d9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsAttributes.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * GGFS attributes.
+ * <p>
+ * This class contains information on a single GGFS configured on some node.
+ */
+public class IgfsAttributes implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** File's data block size (bytes). */
+    private int blockSize;
+
+    /** Size of the group figured in {@link org.apache.ignite.ignitefs.IgniteFsGroupDataBlocksKeyMapper}. */
+    private int grpSize;
+
+    /** Meta cache name. */
+    private String metaCacheName;
+
+    /** Data cache name. */
+    private String dataCacheName;
+
+    /** Default mode. */
+    private IgniteFsMode dfltMode;
+
+    /** Fragmentizer enabled flag. */
+    private boolean fragmentizerEnabled;
+
+    /** Path modes. */
+    private Map<String, IgniteFsMode> pathModes;
+
+    /**
+     * @param ggfsName GGFS name.
+     * @param blockSize File's data block size (bytes).
+     * @param grpSize Size of the group figured in {@link org.apache.ignite.ignitefs.IgniteFsGroupDataBlocksKeyMapper}.
+     * @param metaCacheName Meta cache name.
+     * @param dataCacheName Data cache name.
+     * @param dfltMode Default mode.
+     * @param pathModes Path modes.
+     */
+    public IgfsAttributes(String ggfsName, int blockSize, int grpSize, String metaCacheName, String dataCacheName,
+        IgniteFsMode dfltMode, Map<String, IgniteFsMode> pathModes, boolean fragmentizerEnabled) {
+        this.blockSize = blockSize;
+        this.ggfsName = ggfsName;
+        this.grpSize = grpSize;
+        this.metaCacheName = metaCacheName;
+        this.dataCacheName = dataCacheName;
+        this.dfltMode = dfltMode;
+        this.pathModes = pathModes;
+        this.fragmentizerEnabled = fragmentizerEnabled;
+    }
+
+    /**
+     * Public no-arg constructor for {@link Externalizable}.
+     */
+    public IgfsAttributes() {
+        // No-op.
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String ggfsName() {
+        return ggfsName;
+    }
+
+    /**
+     * @return File's data block size (bytes).
+     */
+    public int blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Size of the group figured in {@link org.apache.ignite.ignitefs.IgniteFsGroupDataBlocksKeyMapper}.
+     */
+    public int groupSize() {
+        return grpSize;
+    }
+
+    /**
+     * @return Metadata cache name.
+     */
+    public String metaCacheName() {
+        return metaCacheName;
+    }
+
+    /**
+     * @return Data cache name.
+     */
+    public String dataCacheName() {
+        return dataCacheName;
+    }
+
+    /**
+     * @return Default mode.
+     */
+    public IgniteFsMode defaultMode() {
+        return dfltMode;
+    }
+
+    /**
+     * @return Path modes.
+     */
+    public Map<String, IgniteFsMode> pathModes() {
+        return pathModes != null ? Collections.unmodifiableMap(pathModes) : null;
+    }
+
+    /**
+     * @return {@code True} if fragmentizer is enabled.
+     */
+    public boolean fragmentizerEnabled() {
+        return fragmentizerEnabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ggfsName);
+        out.writeInt(blockSize);
+        out.writeInt(grpSize);
+        U.writeString(out, metaCacheName);
+        U.writeString(out, dataCacheName);
+        U.writeEnum(out, dfltMode);
+        out.writeBoolean(fragmentizerEnabled);
+
+        if (pathModes != null) {
+            out.writeBoolean(true);
+
+            out.writeInt(pathModes.size());
+
+            for (Map.Entry<String, IgniteFsMode> pathMode : pathModes.entrySet()) {
+                U.writeString(out, pathMode.getKey());
+                U.writeEnum(out, pathMode.getValue());
+            }
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ggfsName = U.readString(in);
+        blockSize = in.readInt();
+        grpSize = in.readInt();
+        metaCacheName = U.readString(in);
+        dataCacheName = U.readString(in);
+        dfltMode = IgniteFsMode.fromOrdinal(in.readByte());
+        fragmentizerEnabled = in.readBoolean();
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            pathModes = new HashMap<>(size, 1.0f);
+
+            for (int i = 0; i < size; i++)
+                pathModes.put(U.readString(in), IgniteFsMode.fromOrdinal(in.readByte()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockKey.java
new file mode 100644
index 0000000..81f0a36
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockKey.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * File's binary data block key.
+ */
+@GridInternal
+public final class IgfsBlockKey extends MessageAdapter implements Externalizable, Comparable<IgfsBlockKey> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File system file ID. */
+    private IgniteUuid fileId;
+
+    /** Block ID. */
+    private long blockId;
+
+    /** Block affinity key. */
+    private IgniteUuid affKey;
+
+    /** Eviction exclude flag. */
+    private boolean evictExclude;
+
+    /**
+     * Constructs file's binary data block key.
+     *
+     * @param fileId File ID.
+     * @param affKey Affinity key.
+     * @param evictExclude Evict exclude flag.
+     * @param blockId Block ID.
+     */
+    public IgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) {
+        assert fileId != null;
+        assert blockId >= 0;
+
+        this.fileId = fileId;
+        this.affKey = affKey;
+        this.evictExclude = evictExclude;
+        this.blockId = blockId;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgfsBlockKey() {
+        // No-op.
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid getFileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Block affinity key.
+     */
+    public IgniteUuid affinityKey() {
+        return affKey;
+    }
+
+    /**
+     * @return Evict exclude flag.
+     */
+    public boolean evictExclude() {
+        return evictExclude;
+    }
+
+    /**
+     * @return Block ID.
+     */
+    public long getBlockId() {
+        return blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull IgfsBlockKey o) {
+        int res = fileId.compareTo(o.fileId);
+
+        if (res != 0)
+            return res;
+
+        long v1 = blockId;
+        long v2 = o.blockId;
+
+        if (v1 != v2)
+            return v1 > v2 ? 1 : -1;
+
+        if (affKey == null && o.affKey == null)
+            return 0;
+
+        if (affKey != null && o.affKey != null)
+            return affKey.compareTo(o.affKey);
+
+        return affKey != null ? -1 : 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, fileId);
+        U.writeGridUuid(out, affKey);
+        out.writeBoolean(evictExclude);
+        out.writeLong(blockId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        fileId = U.readGridUuid(in);
+        affKey = U.readGridUuid(in);
+        evictExclude = in.readBoolean();
+        blockId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || o.getClass() != getClass())
+            return false;
+
+        IgfsBlockKey that = (IgfsBlockKey)o;
+
+        return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) &&
+            evictExclude == that.evictExclude;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsBlockKey _clone = new IgfsBlockKey();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        IgfsBlockKey _clone = (IgfsBlockKey)_msg;
+
+        _clone.fileId = fileId;
+        _clone.blockId = blockId;
+        _clone.affKey = affKey;
+        _clone.evictExclude = evictExclude;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeIgniteUuid("affKey", affKey))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeLong("blockId", blockId))
+                    return false;
+
+                state++;
+
+            case 2:
+                if (!writer.writeBoolean("evictExclude", evictExclude))
+                    return false;
+
+                state++;
+
+            case 3:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("fallthrough")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        switch (state) {
+            case 0:
+                affKey = reader.readIgniteUuid("affKey");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                blockId = reader.readLong("blockId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 2:
+                evictExclude = reader.readBoolean("evictExclude");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 3:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 65;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsBlockKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockLocationImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockLocationImpl.java
new file mode 100644
index 0000000..f95da41
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlockLocationImpl.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * File block location in the grid.
+ */
+public class IgfsBlockLocationImpl implements IgniteFsBlockLocation, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long start;
+
+    /** */
+    private long len;
+
+    /** */
+    @GridToStringInclude
+    private Collection<UUID> nodeIds;
+
+    /** */
+    private Collection<String> names;
+
+    /** */
+    private Collection<String> hosts;
+
+    /**
+     * Empty constructor for externalizable.
+     */
+    public IgfsBlockLocationImpl() {
+        // No-op.
+    }
+
+    /**
+     * @param location HDFS block location.
+     * @param len New length.
+     */
+    public IgfsBlockLocationImpl(IgniteFsBlockLocation location, long len) {
+        assert location != null;
+
+        start = location.start();
+        this.len = len;
+
+        nodeIds = location.nodeIds();
+        names = location.names();
+        hosts = location.hosts();
+    }
+
+    /**
+     * @param start Start.
+     * @param len Length.
+     * @param nodes Affinity nodes.
+     */
+    public IgfsBlockLocationImpl(long start, long len, Collection<ClusterNode> nodes) {
+        assert start >= 0;
+        assert len > 0;
+        assert nodes != null && !nodes.isEmpty();
+
+        this.start = start;
+        this.len = len;
+
+        convertFromNodes(nodes);
+    }
+
+    /**
+     * @return Start position.
+     */
+    @Override public long start() {
+        return start;
+    }
+
+    /**
+     * @return Length.
+     */
+    @Override public long length() {
+        return len;
+    }
+
+    /**
+     * @return Node IDs.
+     */
+    @Override public Collection<UUID> nodeIds() {
+        return nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> names() {
+        return names;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hosts() {
+        return hosts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int)(start ^ (start >>> 32));
+
+        res = 31 * res + (int)(len ^ (len >>> 32));
+        res = 31 * res + nodeIds.hashCode();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        IgfsBlockLocationImpl that = (IgfsBlockLocationImpl)o;
+
+        return len == that.len && start == that.start && F.eq(nodeIds, that.nodeIds) && F.eq(names, that.names) &&
+            F.eq(hosts, that.hosts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsBlockLocationImpl.class, this);
+    }
+
+    /**
+     * Writes this object to data output. Note that this is not externalizable
+     * interface because we want to eliminate any marshaller.
+     *
+     * @param out Data output to write.
+     * @throws IOException If write failed.
+     */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert names != null;
+        assert hosts != null;
+
+        out.writeLong(start);
+        out.writeLong(len);
+
+        out.writeBoolean(nodeIds != null);
+
+        if (nodeIds != null) {
+            out.writeInt(nodeIds.size());
+
+            for (UUID nodeId : nodeIds)
+                U.writeUuid(out, nodeId);
+        }
+
+        out.writeInt(names.size());
+
+        for (String name : names)
+            out.writeUTF(name);
+
+        out.writeInt(hosts.size());
+
+        for (String host : hosts)
+            out.writeUTF(host);
+    }
+
+    /**
+     * Reads object from data input. Note we do not use externalizable interface
+     * to eliminate marshaller.
+     *
+     * @param in Data input.
+     * @throws IOException If read failed.
+     */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        start = in.readLong();
+        len = in.readLong();
+
+        int size;
+
+        if (in.readBoolean()) {
+            size = in.readInt();
+
+            nodeIds = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                nodeIds.add(U.readUuid(in));
+        }
+
+        size = in.readInt();
+
+        names = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            names.add(in.readUTF());
+
+        size = in.readInt();
+
+        hosts = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++)
+            hosts.add(in.readUTF());
+    }
+
+    /**
+     * Converts collection of rich nodes to block location data.
+     *
+     * @param nodes Collection of affinity nodes.
+     */
+    private void convertFromNodes(Collection<ClusterNode> nodes) {
+        Collection<String> names = new LinkedHashSet<>();
+        Collection<String> hosts = new LinkedHashSet<>();
+        Collection<UUID> nodeIds = new ArrayList<>(nodes.size());
+
+        for (final ClusterNode node : nodes) {
+            // Normalize host names into Hadoop-expected format.
+            try {
+                Collection<InetAddress> addrs = U.toInetAddresses(node);
+
+                for (InetAddress addr : addrs) {
+                    if (addr.getHostName() == null)
+                        names.add(addr.getHostAddress() + ":" + 9001);
+                    else {
+                        names.add(addr.getHostName() + ":" + 9001); // hostname:portNumber
+                        hosts.add(addr.getHostName());
+                    }
+                }
+            }
+            catch (IgniteCheckedException ignored) {
+                names.addAll(node.addresses());
+            }
+
+            nodeIds.add(node.id());
+        }
+
+        this.nodeIds = nodeIds;
+        this.names = names;
+        this.hosts = hosts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlocksMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlocksMessage.java
index e7934df..95a9430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlocksMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsBlocksMessage.java
@@ -39,8 +39,8 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
     private long id;
 
     /** Blocks to store. */
-    @GridDirectMap(keyType = GridGgfsBlockKey.class, valueType = byte[].class)
-    private Map<GridGgfsBlockKey, byte[]> blocks;
+    @GridDirectMap(keyType = IgfsBlockKey.class, valueType = byte[].class)
+    private Map<IgfsBlockKey, byte[]> blocks;
 
     /**
      * Empty constructor required by {@link Externalizable}
@@ -56,7 +56,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
      * @param id Message id.
      * @param blocks Blocks to put in cache.
      */
-    public IgfsBlocksMessage(IgniteUuid fileId, long id, Map<GridGgfsBlockKey, byte[]> blocks) {
+    public IgfsBlocksMessage(IgniteUuid fileId, long id, Map<IgfsBlockKey, byte[]> blocks) {
         this.fileId = fileId;
         this.id = id;
         this.blocks = blocks;
@@ -79,7 +79,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
     /**
      * @return Map of blocks to put in cache.
      */
-    public Map<GridGgfsBlockKey, byte[]> blocks() {
+    public Map<IgfsBlockKey, byte[]> blocks() {
         return blocks;
     }
 
@@ -121,7 +121,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
 
         switch (state) {
             case 0:
-                if (!writer.writeMap("blocks", blocks, GridGgfsBlockKey.class, byte[].class))
+                if (!writer.writeMap("blocks", blocks, IgfsBlockKey.class, byte[].class))
                     return false;
 
                 state++;
@@ -153,7 +153,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
 
         switch (state) {
             case 0:
-                blocks = reader.readMap("blocks", GridGgfsBlockKey.class, byte[].class, false);
+                blocks = reader.readMap("blocks", IgfsBlockKey.class, byte[].class, false);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsClientSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsClientSession.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsClientSession.java
new file mode 100644
index 0000000..09b09bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsClientSession.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * GGFS client session. Effectively used to manage lifecycle of opened resources and close them on
+ * connection close.
+ */
+public class IgfsClientSession {
+    /** Session resources. */
+    private ConcurrentMap<Long, Closeable> rsrcMap = new ConcurrentHashMap8<>();
+
+    /**
+     * Registers resource within this session.
+     *
+     * @param rsrcId Resource id.
+     * @param rsrc Resource to register.
+     */
+    public boolean registerResource(long rsrcId, Closeable rsrc) {
+        Object old = rsrcMap.putIfAbsent(rsrcId, rsrc);
+
+        return old == null;
+    }
+
+    /**
+     * Gets registered resource by ID.
+     *
+     * @param rsrcId Resource ID.
+     * @return Resource or {@code null} if resource was not found.
+     */
+    @Nullable public <T> T resource(Long rsrcId) {
+        return (T)rsrcMap.get(rsrcId);
+    }
+
+    /**
+     * Unregister previously registered resource.
+     *
+     * @param rsrcId Resource ID.
+     * @param rsrc Resource to unregister.
+     * @return {@code True} if resource was unregistered, {@code false} if no resource
+     *      is associated with this ID or other resource is associated with this ID.
+     */
+    public boolean unregisterResource(Long rsrcId, Closeable rsrc) {
+        return rsrcMap.remove(rsrcId, rsrc);
+    }
+
+    /**
+     * @return Registered resources iterator.
+     */
+    public Iterator<Closeable> registeredResources() {
+        return rsrcMap.values().iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsContext.java
new file mode 100644
index 0000000..95ecbc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsContext.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+
+/**
+ * GGFS context holding all required components for GGFS instance.
+ */
+public class IgfsContext {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Configuration. */
+    private final IgniteFsConfiguration cfg;
+
+    /** Managers. */
+    private List<IgfsManager> mgrs = new LinkedList<>();
+
+    /** Meta manager. */
+    private final IgfsMetaManager metaMgr;
+
+    /** Data manager. */
+    private final IgfsDataManager dataMgr;
+
+    /** Server manager. */
+    private final IgfsServerManager srvMgr;
+
+    /** Fragmentizer manager. */
+    private final IgfsFragmentizerManager fragmentizerMgr;
+
+    /** GGFS instance. */
+    private final IgfsEx ggfs;
+
+    /**
+     * @param ctx Kernal context.
+     * @param cfg GGFS configuration.
+     * @param metaMgr Meta manager.
+     * @param dataMgr Data manager.
+     * @param srvMgr Server manager.
+     * @param fragmentizerMgr Fragmentizer manager.
+     * @throws IgniteCheckedException If GGFs context instantiation is failed.
+     */
+    public IgfsContext(
+        GridKernalContext ctx,
+        IgniteFsConfiguration cfg,
+        IgfsMetaManager metaMgr,
+        IgfsDataManager dataMgr,
+        IgfsServerManager srvMgr,
+        IgfsFragmentizerManager fragmentizerMgr
+    ) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.cfg = cfg;
+
+        this.metaMgr = add(metaMgr);
+        this.dataMgr = add(dataMgr);
+        this.srvMgr = add(srvMgr);
+        this.fragmentizerMgr = add(fragmentizerMgr);
+
+        ggfs = new IgfsImpl(this);
+    }
+
+    /**
+     * @return GGFS instance.
+     */
+    public IgfsEx ggfs() {
+        return ggfs;
+    }
+
+    /**
+     * @return Kernal context.
+     */
+    public GridKernalContext kernalContext() {
+        return ctx;
+    }
+
+    /**
+     * @return GGFS configuration.
+     */
+    public IgniteFsConfiguration configuration() {
+        return cfg;
+    }
+
+    /**
+     * @return List of managers, in starting order.
+     */
+    public List<IgfsManager> managers() {
+        return mgrs;
+    }
+
+    /**
+     * @return Meta manager.
+     */
+    public IgfsMetaManager meta() {
+        return metaMgr;
+    }
+
+    /**
+     * @return Data manager.
+     */
+    public IgfsDataManager data() {
+        return dataMgr;
+    }
+
+    /**
+     * @return Server manager.
+     */
+    public IgfsServerManager server() {
+        return srvMgr;
+    }
+
+    /**
+     * @return Fragmentizer manager.
+     */
+    public IgfsFragmentizerManager fragmentizer() {
+        return fragmentizerMgr;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void send(UUID nodeId, Object topic, IgfsCommunicationMessage msg, GridIoPolicy plc)
+        throws IgniteCheckedException {
+        if (!kernalContext().localNodeId().equals(nodeId))
+            msg.prepareMarshal(kernalContext().config().getMarshaller());
+
+        kernalContext().io().send(nodeId, topic, msg, plc);
+    }
+
+    /**
+     * @param node Node.
+     * @param topic Topic.
+     * @param msg Message.
+     * @param plc Policy.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, GridIoPolicy plc)
+        throws IgniteCheckedException {
+        if (!kernalContext().localNodeId().equals(node.id()))
+            msg.prepareMarshal(kernalContext().config().getMarshaller());
+
+        kernalContext().io().send(node, topic, msg, plc);
+    }
+
+    /**
+     * Checks if given node is a GGFS node.
+     *
+     * @param node Node to check.
+     * @return {@code True} if node has GGFS with this name, {@code false} otherwise.
+     */
+    public boolean ggfsNode(ClusterNode node) {
+        assert node != null;
+
+        IgfsAttributes[] ggfs = node.attribute(ATTR_GGFS);
+
+        if (ggfs != null)
+            for (IgfsAttributes attrs : ggfs)
+                if (F.eq(cfg.getName(), attrs.ggfsName()))
+                    return true;
+
+        return false;
+    }
+
+    /**
+     * Adds manager to managers list.
+     *
+     * @param mgr Manager.
+     * @return Added manager.
+     */
+    private <T extends IgfsManager> T add(@Nullable T mgr) {
+        if (mgr != null)
+            mgrs.add(mgr);
+
+        return mgr;
+    }
+}


Mime
View raw message