ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/4] ignite git commit: Implemented request send routine.
Date Thu, 28 Jul 2016 12:43:47 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3553 6826e3d13 -> 7c801e756


Implemented request send routine.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c3af5744
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c3af5744
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c3af5744

Branch: refs/heads/ignite-3553
Commit: c3af57447d41539acd456ebdc04a82f7103c2408
Parents: 6826e3d
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Jul 28 14:45:05 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Jul 28 14:45:05 2016 +0300

----------------------------------------------------------------------
 .../igfs/client/IgfsClientManager.java          | 50 +++++++++++++++-----
 .../igfs/client/IgfsClientRequest.java          |  5 +-
 2 files changed, 40 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c3af5744/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
index 0686bfa..e8a19e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsContext;
 import org.apache.ignite.internal.processors.igfs.IgfsImpl;
 import org.apache.ignite.internal.processors.igfs.IgfsManager;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -45,6 +46,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -72,6 +74,9 @@ public class IgfsClientManager extends IgfsManager {
     /** Pending input operations received when manager is not started yet. */
     private final ConcurrentLinkedDeque<IgfsClientInOperation> pendingOps = new ConcurrentLinkedDeque<>();
 
+    /** Message ID generator. */
+    private final AtomicLong msgIdGen = new AtomicLong();
+
     /** Worker to process pending requests. */
     private PendingRequestsWorker pendingWorker;
 
@@ -191,19 +196,44 @@ public class IgfsClientManager extends IgfsManager {
      * @param strategy Node selection strategy.
      * @return Future.
      */
+    @SuppressWarnings("unchecked")
     public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T>
clo,
         IgfsClientNodeSelectionStrategy strategy) {
-        try {
+        while (true) {
+            rwLock.readLock().lock();
 
-            ClusterNode node = selectNode(igfsCtx, strategy);
-        }
-        catch (IgniteCheckedException e) {
-            // TODO
-        }
+            try {
+                // Get suitable node.
+                ClusterNode node = selectNode(igfsCtx, strategy);
 
-        // TODO
+                if (node == null)
+                    throw new IgfsException("Failed to execute operation because there are
no IGFS metadata nodes [igfs="
+                        + igfsCtx.igfs().name() + ']');
 
-        return null;
+                // Add operation to pending set.
+                long msgId = msgIdGen.incrementAndGet();
+
+                IgfsClientOutOperation op = new IgfsClientOutOperation(node.id(), clo, new
GridFutureAdapter());
+
+                outOps.put(msgId, op);
+
+                // Send request.
+                try {
+                    ctx.io().send(node, GridTopic.TOPIC_IGFS_CLI, new IgfsClientRequest(msgId,
clo),
+                        GridIoPolicy.PUBLIC_POOL);
+
+                    return op.future();
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send message to node, will retry [nodeId=" +
node.id() +
+                            ", err=" + e + ']');
+                }
+            }
+            finally {
+                rwLock.readLock().unlock();
+            }
+        }
     }
 
     /**
@@ -212,10 +242,8 @@ public class IgfsClientManager extends IgfsManager {
      * @param igfsCtx IGFS context.
      * @param strategy Strategy.
      * @return Node.
-     * @throws IgniteCheckedException If failed to find the node.
      */
-    private ClusterNode selectNode(IgfsContext igfsCtx, IgfsClientNodeSelectionStrategy strategy)
-        throws IgniteCheckedException {
+    @Nullable private ClusterNode selectNode(IgfsContext igfsCtx, IgfsClientNodeSelectionStrategy
strategy) {
         // TODO
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c3af5744/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
index f7f7ebf..21bf57e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 import java.nio.ByteBuffer;
-import java.util.UUID;
 
 /**
  * IGFS client closure execute request.
@@ -50,12 +49,10 @@ public class IgfsClientRequest implements Message {
     /**
      * Constructor.
      *
-     * @param nodeId Originating node ID.
      * @param msgId Message ID.
      * @param target Target callable.
      */
-    public IgfsClientRequest(UUID nodeId, long msgId, IgfsClientAbstractCallable target)
{
-        assert nodeId != null;
+    public IgfsClientRequest(long msgId, IgfsClientAbstractCallable target) {
         assert target != null;
 
         this.msgId = msgId;


Mime
View raw message