ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] Mmuzaf commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi support channels initial commit
Date Thu, 08 Aug 2019 09:03:57 GMT
Mmuzaf commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi support
channels initial commit
URL: https://github.com/apache/ignite/pull/5619#discussion_r311929728
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 ##########
 @@ -2348,6 +2686,288 @@ public int getOutboundMessagesQueueSize() {
         return getSpi().getOutboundMessagesQueueSize();
     }
 
+    /**
+     * @param rctx Receiver context to use.
+     * @param ex Exception to close receiver with.
+     */
+    private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+        if (rctx == null)
+            return;
+
+        if (rctx.interrupted.compareAndSet(false, true)) {
+            if (rctx.timeoutObj != null)
+                ctx.timeout().removeTimeoutObject(rctx.timeoutObj);
+
+            U.closeQuiet(rctx.rcv);
+
+            rctx.lastState = rctx.lastState == null ?
+                new TransmissionMeta(ex) : rctx.lastState.error(ex);
+
+            rctx.hnd.onException(rctx.nodeId, ex);
+
+            U.error(log, "Receiver has been interrupted due to an excpetion occurred [ctx="
+ rctx + ']', ex);
+        }
+    }
+
+    /**
+     * @param topic Topic to which the channel is created.
+     * @param nodeId Remote node id.
+     * @param initMsg Channel initialization message with additional params.
+     * @param ch Channel instance.
+     */
+    private void processOpenedChannel(Object topic, UUID nodeId, SessionChannelMessage initMsg,
SocketChannel ch) {
+        ReceiverContext rcvCtx = null;
+        ObjectInputStream in = null;
+        ObjectOutputStream out = null;
+
+        try {
+            if (stopping) {
+                throw new NodeStoppingException("Local node is stopping. Channel will be
closed [topic=" + topic +
+                    ", channel=" + ch + ']');
+            }
+
+            if (initMsg == null || initMsg.sesId() == null) {
+                U.warn(log, "There is no initial message provied for given topic. Opened
channel will be closed " +
+                    "[nodeId=" + nodeId + ", topic=" + topic + ", initMsg=" + initMsg + ']');
+
+                return;
+            }
+
+            configureChannel(ch, netTimeoutMs);
+
+            in = new ObjectInputStream(ch.socket().getInputStream());
+            out = new ObjectOutputStream(ch.socket().getOutputStream());
+
+            IgniteUuid newSesId = initMsg.sesId();
+
+            synchronized (rcvMux) {
+                TransmissionHandler hnd = topicTransmissionHnds.get(topic);
+
+                if (hnd == null) {
+                    U.warn(log, "There is no handler for a given topic. Channel will be closed
[nodeId=" + nodeId +
+                        ", topic=" + topic + ']');
+
+                    return;
+                }
+
+                rcvCtx = rcvCtxs.computeIfAbsent(topic, t -> new ReceiverContext(nodeId,
hnd, newSesId));
+            }
+
+            // Do not allow multiple connection for the same session
+            if (!newSesId.equals(rcvCtx.sesId)) {
+                IgniteCheckedException err = new IgniteCheckedException("Requested topic
is busy by another transmission. " +
+                    "It's not allowed to process different sessions over the same topic simultaneously.
" +
+                    "Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ",
nodeId=" + nodeId + ']');
+
+                U.error(log, err);
+
+                out.writeObject(new TransmissionMeta(err));
+
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Trasmission open a new channel [nodeId=" + nodeId + ", topic="
+ topic +
+                    ", initMsg=" + initMsg + ']');
+            }
+
+            if (!rcvCtx.lock.tryLock(netTimeoutMs, TimeUnit.MILLISECONDS))
+                throw new IgniteException("Wait for the previous receiver finished its work
timeouted: " + rcvCtx);
+
+            try {
+                if (rcvCtx.timeoutObj != null)
+                    ctx.timeout().removeTimeoutObject(rcvCtx.timeoutObj);
+
+                // Send previous context state to sync remote and local node (on manager
connected).
+                out.writeObject(rcvCtx.lastState == null ? new TransmissionMeta() : rcvCtx.lastState);
+
+                if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
+                    receiveFromChannel(topic, rcvCtx, in, out, ch);
+                else
+                    interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+            }
+            finally {
+                rcvCtx.lock.unlock();
+            }
+        }
+        catch (Throwable t) {
+            U.error(log, "Download session cannot be finished due to an unexpected error
[ctx=" + rcvCtx + ']', t);
+
+            // Do not remove receiver context here, since sender will recconect to get this
error
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message