ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [4/6] ignite git commit: WIP. Wired all managers and context.
Date Wed, 27 Jul 2016 06:56:15 GMT
WIP. Wired all managers and context.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/77d213ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/77d213ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/77d213ea

Branch: refs/heads/ignite-3553
Commit: 77d213ea80a052efd649193d5e9661e6f180a2df
Parents: bb6e87a
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 26 15:01:28 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 26 15:01:28 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsContext.java   | 17 +++++++
 .../processors/igfs/IgfsDataManager.java        | 35 +++++++++-----
 .../igfs/IgfsFragmentizerManager.java           | 41 ++++++++++------
 .../internal/processors/igfs/IgfsManager.java   | 22 +++++++--
 .../processors/igfs/IgfsMetaManager.java        | 31 ++++++------
 .../internal/processors/igfs/IgfsProcessor.java | 26 +++++++---
 .../processors/igfs/IgfsServerManager.java      | 13 ++++-
 .../igfs/client/IgfsClientClosureManager.java   | 51 ++++++++++++--------
 .../igfs/client/IgfsClientClosureResponse.java  |  2 +-
 9 files changed, 160 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index a638bf3..3f67156 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -39,6 +40,9 @@ public class IgfsContext {
     /** Managers. */
     private List<IgfsManager> mgrs = new LinkedList<>();
 
+    /** Closure manager. */
+    private final IgfsClientClosureManager cloMgr;
+
     /** Meta manager. */
     private final IgfsMetaManager metaMgr;
 
@@ -55,8 +59,11 @@ public class IgfsContext {
     private final IgfsEx igfs;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      * @param cfg IGFS configuration.
+     * @param cloMgr Closure manager.
      * @param metaMgr Meta manager.
      * @param dataMgr Data manager.
      * @param srvMgr Server manager.
@@ -66,6 +73,7 @@ public class IgfsContext {
     public IgfsContext(
         GridKernalContext ctx,
         FileSystemConfiguration cfg,
+        IgfsClientClosureManager cloMgr,
         IgfsMetaManager metaMgr,
         IgfsDataManager dataMgr,
         IgfsServerManager srvMgr,
@@ -74,6 +82,8 @@ public class IgfsContext {
         this.ctx = ctx;
         this.cfg = cfg;
 
+        this.cloMgr = cloMgr;
+
         this.metaMgr = add(metaMgr);
         this.dataMgr = add(dataMgr);
         this.srvMgr = add(srvMgr);
@@ -111,6 +121,13 @@ public class IgfsContext {
     }
 
     /**
+     * @return Closure manager.
+     */
+    public IgfsClientClosureManager closure() {
+        return cloMgr;
+    }
+
+    /**
      * @return Meta manager.
      */
     public IgfsMetaManager meta() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 1397e4e..07d80dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,6 +33,7 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
 import org.apache.ignite.igfs.IgfsOutOfSpaceException;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -143,7 +144,16 @@ public class IgfsDataManager extends IgfsManager {
         new ConcurrentHashMap8<>();
 
     /**
+     * Constructor.
      *
+     * @param ctx Kernal context.
+     */
+    public IgfsDataManager(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * Await data cache initiaization.
      */
     void awaitInit() {
         try {
@@ -162,7 +172,7 @@ public class IgfsDataManager extends IgfsManager {
 
         topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
 
-        igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener()
{
+        ctx.io().addMessageListener(topic, new GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object msg) {
                 if (msg instanceof IgfsBlocksMessage)
                     processBlocksMessage(nodeId, (IgfsBlocksMessage)msg);
@@ -171,7 +181,7 @@ public class IgfsDataManager extends IgfsManager {
             }
         });
 
-        igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener()
{
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
 
@@ -186,16 +196,15 @@ public class IgfsDataManager extends IgfsManager {
             }
         }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
+        igfsSvc = ctx.getIgfsExecutorService();
 
-        delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
-            "igfs-" + igfsName + "-delete-worker", log);
+        delWorker = new AsyncDeleteWorker(ctx.gridName(), "igfs-" + igfsName + "-delete-worker",
log);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        dataCachePrj = igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
+        dataCachePrj = ctx.cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
 
         assert dataCachePrj != null;
 
@@ -203,7 +212,7 @@ public class IgfsDataManager extends IgfsManager {
 
         metrics = igfsCtx.igfs().localMetrics();
 
-        AffinityKeyMapper mapper = igfsCtx.kernalContext().cache()
+        AffinityKeyMapper mapper = ctx.cache()
             .internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper();
 
         grpSize = mapper instanceof IgfsGroupDataBlocksKeyMapper ?
@@ -213,7 +222,7 @@ public class IgfsDataManager extends IgfsManager {
 
         assert grpBlockSize != 0;
 
-        igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName()).preloader()
+        ctx.cache().internalCache(igfsCtx.configuration().getDataCacheName()).preloader()
             .startFuture().listen(new CI1<IgniteInternalFuture<Object>>() {
             @Override public void apply(IgniteInternalFuture<Object> f) {
                 dataCacheStartLatch.countDown();
@@ -266,7 +275,7 @@ public class IgfsDataManager extends IgfsManager {
         if (!dataCache.context().affinityNode())
             return null;
 
-        UUID nodeId = igfsCtx.kernalContext().localNodeId();
+        UUID nodeId = ctx.localNodeId();
 
         if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal())
             return prevAffKey;
@@ -296,7 +305,7 @@ public class IgfsDataManager extends IgfsManager {
      */
     private IgniteDataStreamer<IgfsBlockKey, byte[]> dataStreamer() {
         IgniteDataStreamer<IgfsBlockKey, byte[]> ldr =
-            igfsCtx.kernalContext().<IgfsBlockKey, byte[]>dataStream().dataStreamer(dataCachePrj.name());
+            ctx.<IgfsBlockKey, byte[]>dataStream().dataStreamer(dataCachePrj.name());
 
         FileSystemConfiguration cfg = igfsCtx.configuration();
 
@@ -331,7 +340,7 @@ public class IgfsDataManager extends IgfsManager {
         final IgfsBlockKey key = blockKey(blockIdx, fileInfo);
 
         if (log.isDebugEnabled() &&
-            dataCache.affinity().isPrimaryOrBackup(igfsCtx.kernalContext().discovery().localNode(),
key)) {
+            dataCache.affinity().isPrimaryOrBackup(ctx.discovery().localNode(), key)) {
             log.debug("Reading non-local data block [path=" + path + ", fileInfo=" + fileInfo
+
                 ", blockIdx=" + blockIdx + ']');
         }
@@ -1022,7 +1031,7 @@ public class IgfsDataManager extends IgfsManager {
                 ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
 
             completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough
space on node): " +
-                igfsCtx.kernalContext().localNodeId(), e));
+                ctx.localNodeId(), e));
 
             return;
         }
@@ -1149,7 +1158,7 @@ public class IgfsDataManager extends IgfsManager {
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
         try {
-            ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+            ackMsg.finishUnmarshal(ctx.config().getMarshaller(), null);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index d64c64a..2e3843c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -99,13 +100,22 @@ public class IgfsFragmentizerManager extends IgfsManager {
     /** Message topic. */
     private Object topic;
 
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public IgfsFragmentizerManager(GridKernalContext ctx) {
+        super(ctx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         if (!igfsCtx.configuration().isFragmentizerEnabled())
             return;
 
         // We care only about node leave and fail events.
-        igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener()
{
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
 
@@ -121,7 +131,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
 
         topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
 
-        igfsCtx.kernalContext().io().addMessageListener(topic, fragmentizerWorker);
+        ctx.io().addMessageListener(topic, fragmentizerWorker);
 
         new IgniteThread(fragmentizerWorker).start();
     }
@@ -130,7 +140,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (igfsCtx.configuration().isFragmentizerEnabled()) {
             // Check at startup if this node is a fragmentizer coordinator.
-            DiscoveryEvent locJoinEvt = igfsCtx.kernalContext().discovery().localJoinEvent();
+            DiscoveryEvent locJoinEvt = ctx.discovery().localJoinEvent();
 
             checkLaunchCoordinator(locJoinEvt);
         }
@@ -191,7 +201,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
                 return;
             }
             catch (IgniteCheckedException e) {
-                if (!igfsCtx.kernalContext().discovery().alive(nodeId))
+                if (!ctx.discovery().alive(nodeId))
                     throw new ClusterTopologyCheckedException("Failed to send message (node
left the grid) " +
                         "[nodeId=" + nodeId + ", msg=" + msg + ']');
 
@@ -226,7 +236,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
                         minNodeOrder = node.order();
                 }
 
-                ClusterNode locNode = igfsCtx.kernalContext().grid().localNode();
+                ClusterNode locNode = ctx.grid().localNode();
 
                 if (locNode.order() == minNodeOrder) {
                     if (log.isDebugEnabled())
@@ -263,7 +273,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
      */
     @SuppressWarnings("fallthrough")
     private void processFragmentizerRequest(IgfsFragmentizerRequest req) throws IgniteCheckedException
{
-        req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+        req.finishUnmarshal(ctx.config().getMarshaller(), null);
 
         Collection<IgfsFileAffinityRange> ranges = req.fragmentRanges();
         IgniteUuid fileId = req.fileId();
@@ -356,11 +366,11 @@ public class IgfsFragmentizerManager extends IgfsManager {
          * Constructor.
          */
         protected FragmentizerCoordinator() {
-            super(igfsCtx.kernalContext().gridName(), "fragmentizer-coordinator",
-                igfsCtx.kernalContext().log(IgfsFragmentizerManager.class));
+            super(ctx.gridName(), "fragmentizer-coordinator",
+                ctx.log(IgfsFragmentizerManager.class));
 
-            igfsCtx.kernalContext().event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
-            igfsCtx.kernalContext().io().addMessageListener(topic, this);
+            ctx.event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
+            ctx.io().addMessageListener(topic, this);
         }
 
         /** {@inheritDoc} */
@@ -481,7 +491,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
             else if (msg instanceof IgfsSyncMessage) {
                 IgfsSyncMessage sync = (IgfsSyncMessage)msg;
 
-                if (sync.response() && sync.order() == igfsCtx.kernalContext().grid().localNode().order())
{
+                if (sync.response() && sync.order() == ctx.grid().localNode().order())
{
                     if (log.isDebugEnabled())
                         log.debug("Received fragmentizer sync response from remote node:
" + nodeId);
 
@@ -523,7 +533,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
         private void syncStart() throws InterruptedException {
             Collection<UUID> startSync0 = startSync = new GridConcurrentHashSet<>(
                 F.viewReadOnly(
-                    igfsCtx.kernalContext().discovery().allNodes(),
+                    ctx.discovery().allNodes(),
                     F.node2id(),
                     new P1<ClusterNode>() {
                         @Override public boolean apply(ClusterNode n) {
@@ -531,7 +541,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
                         }
                     }));
 
-            ClusterNode locNode = igfsCtx.kernalContext().grid().localNode();
+            ClusterNode locNode = ctx.grid().localNode();
 
             while (!startSync0.isEmpty()) {
                 for (UUID nodeId : startSync0) {
@@ -545,7 +555,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
                         sendWithRetries(nodeId, syncReq);
 
                         // Close window between message sending and discovery event.
-                        if (!igfsCtx.kernalContext().discovery().alive(nodeId))
+                        if (!ctx.discovery().alive(nodeId))
                             startSync0.remove(nodeId);
                     }
                     catch (IgniteCheckedException e) {
@@ -669,8 +679,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
          * Constructor.
          */
         protected FragmentizerWorker() {
-            super(igfsCtx.kernalContext().gridName(), "fragmentizer-worker",
-                igfsCtx.kernalContext().log(IgfsFragmentizerManager.class));
+            super(ctx.gridName(), "fragmentizer-worker", ctx.log(IgfsFragmentizerManager.class));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java
index cdd277b..56e561b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsManager.java
@@ -20,12 +20,17 @@ package org.apache.ignite.internal.processors.igfs;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract class for IGFS managers.
  */
 public abstract class IgfsManager {
-    /** IGFS context. */
+    /** Kernal context. */
+    protected final GridKernalContext ctx;
+
+    /** IGFS context (not set for shared managers). */
     protected IgfsContext igfsCtx;
 
     /** Logger. */
@@ -35,19 +40,26 @@ public abstract class IgfsManager {
     private AtomicBoolean starting = new AtomicBoolean();
 
     /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    protected IgfsManager(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
      * Called when IGFS processor is started.
      *
      * @param igfsCtx IGFS context.
      */
-    public void start(IgfsContext igfsCtx) throws IgniteCheckedException {
+    public void start(@Nullable IgfsContext igfsCtx) throws IgniteCheckedException {
         if (!starting.compareAndSet(false, true))
             assert false : "Method start is called more than once for manager: " + this;
 
-        assert igfsCtx != null;
-
         this.igfsCtx = igfsCtx;
 
-        log = igfsCtx.kernalContext().log(getClass());
+        log = ctx.log(getClass());
 
         start0();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index d891b38..06d65fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -159,7 +160,9 @@ public class IgfsMetaManager extends IgfsManager {
      * @param relaxed Relaxed mode flag.
      * @param client Client flag.
      */
-    public IgfsMetaManager(boolean relaxed, boolean client) {
+    public IgfsMetaManager(GridKernalContext ctx, boolean relaxed, boolean client) {
+        super(ctx);
+
         this.relaxed = relaxed;
         this.client = client;
     }
@@ -182,21 +185,21 @@ public class IgfsMetaManager extends IgfsManager {
 
         cfg = igfsCtx.configuration();
 
-        evts = igfsCtx.kernalContext().event();
+        evts = ctx.event();
 
         sampling = new IgfsSamplingKey(cfg.getName());
 
-        log = igfsCtx.kernalContext().log(IgfsMetaManager.class);
+        log = ctx.log(IgfsMetaManager.class);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("RedundantCast")
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName());
+        metaCache = ctx.cache().getOrStartCache(cfg.getMetaCacheName());
 
         assert metaCache != null;
 
-        igfsCtx.kernalContext().cache().internalCache(cfg.getMetaCacheName()).preloader().startFuture()
+        ctx.cache().internalCache(cfg.getMetaCacheName()).preloader().startFuture()
             .listen(new CI1<IgniteInternalFuture<Object>>() {
                 @Override public void apply(IgniteInternalFuture<Object> f) {
                     metaCacheStartLatch.countDown();
@@ -205,7 +208,7 @@ public class IgfsMetaManager extends IgfsManager {
 
         id2InfoPrj = (IgniteInternalCache<IgniteUuid, IgfsEntryInfo>)metaCache.<IgniteUuid,
IgfsEntryInfo>cache();
 
-        locNode = igfsCtx.kernalContext().discovery().localNode();
+        locNode = ctx.discovery().localNode();
 
         // Start background delete worker.
         if (!client) {
@@ -266,7 +269,7 @@ public class IgfsMetaManager extends IgfsManager {
         IgniteCompute cliCompute0 = cliCompute;
 
         if (cliCompute0 == null) {
-            IgniteEx ignite = igfsCtx.kernalContext().grid();
+            IgniteEx ignite = ctx.grid();
 
             ClusterGroup cluster = ignite.cluster().forIgfsMetadataDataNodes(cfg.getName(),
cfg.getMetaCacheName());
 
@@ -1011,7 +1014,7 @@ public class IgfsMetaManager extends IgfsManager {
                     // Fire events.
                     IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
 
-                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), srcPath, newPath,
+                    IgfsUtils.sendEvents(ctx, srcPath, newPath,
                         srcInfo.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED);
                 }
             }
@@ -2916,7 +2919,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                             tx.commit();
 
-                            IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
+                            IgfsUtils.sendEvents(ctx, path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
                             return info;
                         }
@@ -3081,7 +3084,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // Prepare result and commit.
                             tx.commit();
 
-                            IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
+                            IgfsUtils.sendEvents(ctx, path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
                             return new IgfsCreateResult(newInfo, secondaryOut);
                         }
@@ -3368,18 +3371,18 @@ public class IgfsMetaManager extends IgfsManager {
     private void generateCreateEvents(List<IgfsPath> createdPaths, boolean file) {
         if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
             for (int i = 0; i < createdPaths.size() - 1; i++)
-                IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i),
+                IgfsUtils.sendEvents(ctx, createdPaths.get(i),
                     EventType.EVT_IGFS_DIR_CREATED);
         }
 
         IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1);
 
         if (file) {
-            IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED);
-            IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE);
+            IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_FILE_CREATED);
+            IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE);
         }
         else
-            IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
+            IgfsUtils.sendEvents(ctx, leafPath, EventType.EVT_IGFS_DIR_CREATED);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 85dcb1c..7d6753d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsJob;
 import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -83,11 +84,16 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
     private final ConcurrentMap<String, IgfsContext> igfsCache =
         new ConcurrentHashMap8<>();
 
+    /** Client closure manager. */
+    private final IgfsClientClosureManager cloMgr;
+
     /**
      * @param ctx Kernal context.
      */
     public IgfsProcessor(GridKernalContext ctx) {
         super(ctx);
+
+        cloMgr = new IgfsClientClosureManager(ctx);
     }
 
     /** {@inheritDoc} */
@@ -127,10 +133,11 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
             IgfsContext igfsCtx = new IgfsContext(
                 ctx,
                 cfg0,
-                new IgfsMetaManager(cfg0.isRelaxedConsistency(), metaClient),
-                new IgfsDataManager(),
-                new IgfsServerManager(),
-                new IgfsFragmentizerManager());
+                cloMgr,
+                new IgfsMetaManager(ctx, cfg0.isRelaxedConsistency(), metaClient),
+                new IgfsDataManager(ctx),
+                new IgfsServerManager(ctx),
+                new IgfsFragmentizerManager(ctx));
 
             // Start managers first.
             for (IgfsManager mgr : igfsCtx.managers())
@@ -139,6 +146,9 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
             igfsCache.put(maskName(cfg0.getName()), igfsCtx);
         }
 
+        // Start closure manager last.
+        cloMgr.start(null);
+
         if (log.isDebugEnabled())
             log.debug("IGFS processor started.");
 
@@ -201,10 +211,14 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
         for (IgfsContext igfsCtx : igfsCache.values())
             for (IgfsManager mgr : igfsCtx.managers())
                 mgr.onKernalStart();
+
+        cloMgr.onKernalStart();
     }
 
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) {
+        cloMgr.stop(cancel);
+
         // Stop IGFS instances.
         for (IgfsContext igfsCtx : igfsCache.values()) {
             if (log.isDebugEnabled())
@@ -229,6 +243,8 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onKernalStop(boolean cancel) {
+        cloMgr.onKernalStop(cancel);
+
         for (IgfsContext igfsCtx : igfsCache.values()) {
             if (log.isDebugEnabled())
                 log.debug("Stopping igfs: " + igfsCtx.configuration().getName());
@@ -389,8 +405,6 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
         if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs))
             return;
 
-        assert rmtAttrs != null && locAttrs != null;
-
         for (IgfsAttributes rmtAttr : rmtAttrs)
             for (IgfsAttributes locAttr : locAttrs) {
                 // Checking the use of different caches on the different IGFSes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
index c12b367..debc590 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.util.ipc.IpcEndpointBindException;
 import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -52,6 +53,15 @@ public class IgfsServerManager extends IgfsManager {
     /** Kernal start latch. */
     private CountDownLatch kernalStartLatch = new CountDownLatch(1);
 
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public IgfsServerManager(GridKernalContext ctx) {
+        super(ctx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         FileSystemConfiguration igfsCfg = igfsCtx.configuration();
@@ -163,8 +173,7 @@ public class IgfsServerManager extends IgfsManager {
          * Constructor.
          */
         private BindWorker() {
-            super(igfsCtx.kernalContext().gridName(), "bind-worker",
-                igfsCtx.kernalContext().log(IgfsServerManager.class));
+            super(ctx.gridName(), "bind-worker", ctx.log(IgfsServerManager.class));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
index 7230cf2..9e0f044 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.igfs.client;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
 import org.apache.ignite.internal.processors.igfs.IgfsManager;
 import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -38,24 +38,23 @@ public class IgfsClientClosureManager extends IgfsManager {
     /** Pending closures received when manager is not started yet. */
     private final ConcurrentLinkedDeque startupClos = new ConcurrentLinkedDeque();
 
-    /** Kernal context. */
-    private GridKernalContext ctx;
-
-    /** IGFS configuration. */
-    private FileSystemConfiguration igfsCfg;
-
     /** Marshaller. */
-    private Marshaller marsh;
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        ctx = igfsCtx.kernalContext();
+    private final Marshaller marsh;
 
-        igfsCfg = igfsCtx.configuration();
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public IgfsClientClosureManager(GridKernalContext ctx) {
+        super(ctx);
 
         marsh = ctx.config().getMarshaller();
+    }
 
-        ctx.discovery().setCustomEventListener();
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        // TODO
     }
 
     /** {@inheritDoc} */
@@ -74,21 +73,31 @@ public class IgfsClientClosureManager extends IgfsManager {
     }
 
     /**
-     * Execute callable.
+     * Execute IGFS closure.
      *
+     * @param igfsCtx IGFS context.
      * @param clo Closure.
      * @return Result.
      */
-    public <T> T execute(IgfsClientAbstractCallable<T> clo) {
+    public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo)
throws IgniteCheckedException {
+        return executeAsync(igfsCtx, clo).get();
+    }
 
+    /**
+     * Execute IGFS closure asynchronously.
+     *
+     * @param igfsCtx IGFS context.
+     * @param clo Closure.
+     * @return Future.
+     */
+    public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T>
clo) {
+        // TODO
+
+        return null;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsClientClosureManager.class, this);
     }
-
-    private class DiscoveryListener implements GridLocalEventListener {
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/77d213ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
index 3a7694a..d768766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.igfs.client;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
- * CIGFS client closure execute response.
+ * IGFS client closure execute response.
  */
 public class IgfsClientClosureResponse implements Message {
     /**


Mime
View raw message