ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [06/10] ignite git commit: Wired up pending worker.
Date Thu, 28 Jul 2016 11:05:48 GMT
Wired up pending worker.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a28f0df
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a28f0df
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a28f0df

Branch: refs/heads/ignite-3553
Commit: 9a28f0df79b8e63f0fecf0c2fb83f734f6fc2996
Parents: 5306312
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Jul 27 16:16:27 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Jul 27 16:16:27 2016 +0300

----------------------------------------------------------------------
 .../igfs/client/IgfsClientManager.java          | 61 +++++++++++++++++---
 1 file changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9a28f0df/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
index 49246ea..65584fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.processors.igfs.client;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -33,7 +35,9 @@ import org.apache.ignite.internal.processors.igfs.IgfsManager;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Map;
@@ -49,9 +53,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
  * Manager to handle IGFS client closures.
  */
 public class IgfsClientManager extends IgfsManager {
-    /** Pending input operations received when manager is not started yet. */
-    private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>();
-
     /** Outgoing operations. */
     private final Map<Long, IgfsClientOutOperation> outOps = new ConcurrentHashMap<>();
 
@@ -74,6 +75,15 @@ public class IgfsClientManager extends IgfsManager {
     /** IO message listener. */
     private final MessageListener msgLsnr = new MessageListener();
 
+    /** Pending input operations received when manager is not started yet. */
+    private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>();
+
+    /** Worker to process pending requests. */
+    private final PendingRequestsWorker pendingWorker = new PendingRequestsWorker();
+
+    /** Whether pending requests worker started. */
+    private boolean pendingWorkerStarted;
+
     /**
      * Constructor.
      *
@@ -100,7 +110,9 @@ public class IgfsClientManager extends IgfsManager {
             ready = true;
 
             if (!pending.isEmpty()) {
-                // TODO: Start separate thread.
+                new IgniteThread(pendingWorker).start();
+
+                pendingWorkerStarted = true;
             }
         }
         finally {
@@ -114,17 +126,23 @@ public class IgfsClientManager extends IgfsManager {
 
         ctx.io().removeMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr);
 
+        boolean pendingWorkerStarted0;
+
         rwLock.writeLock().lock();
 
         try {
             stopping = true;
+
+            pendingWorkerStarted0 = pendingWorkerStarted;
         }
         finally {
             rwLock.writeLock().unlock();
         }
 
-        // Stop pending worker (if any).
-        // TODO
+        if (pendingWorkerStarted0) {
+            U.cancel(pendingWorker);
+            U.join(pendingWorker, log);
+        }
     }
 
     /** {@inheritDoc} */
@@ -214,7 +232,7 @@ public class IgfsClientManager extends IgfsManager {
                 processRequest(nodeId, req); // Normal execution flow.
             else
                 // Add to pending set if manager is not operational yet.
-                pending.add(new IgfsClientInOperation(nodeId, req));
+                pending.addLast(new IgfsClientInOperation(nodeId, req));
         }
         finally {
             rwLock.readLock().unlock();
@@ -321,7 +339,7 @@ public class IgfsClientManager extends IgfsManager {
     }
 
     /**
-     * Handles job execution requests.
+     * Message listener.
      */
     private class MessageListener implements GridMessageListener {
         /** {@inheritDoc} */
@@ -347,7 +365,7 @@ public class IgfsClientManager extends IgfsManager {
             switch (evt.type()) {
                 case EVT_NODE_LEFT:
                 case EVT_NODE_FAILED:
-                    DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
 
                     onNodeLeft(evt0.eventNode().id());
 
@@ -355,6 +373,31 @@ public class IgfsClientManager extends IgfsManager {
 
                 default:
                     assert false : "Unknown event: " + evt;
+            }
+        }
+    }
+
+    /**
+     * Pending requests worker.
+     */
+    private class PendingRequestsWorker extends GridWorker {
+        /**
+         * Consturctor.
+         *
+         * @param gridName Grid name.
+         * @param name WOrker name.
+         * @param log Logger.
+         */
+        public PendingRequestsWorker(@Nullable String gridName, String name, IgniteLogger
log) {
+            super(gridName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
+            IgfsClientInOperation inOp;
+
+            while ((inOp = pending.pollFirst()) != null)
+                processRequest(inOp.nodeId(), inOp.request());
         }
     }
 }


Mime
View raw message