ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi support channels initial commit
Date Wed, 07 Aug 2019 16:01:46 GMT
alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi
support channels initial commit
URL: https://github.com/apache/ignite/pull/5619#discussion_r311616684
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 ##########
 @@ -2368,6 +2988,356 @@ public void dumpStats() {
         X.println(">>>  discoWaitMapSize: " + waitMap.size());
     }
 
+    /**
+     * Read context holds all the information about current transfer read from channel process.
+     */
+    private static class ReceiverContext {
+        /** The remote node input channel came from. */
+        private final UUID nodeId;
+
+        /** Current sesssion handler. */
+        @GridToStringExclude
+        private final TransmissionHandler hnd;
+
+        /** Unique session request id. */
+        private final IgniteUuid sesId;
+
+        /** Flag indicates that current file handling process must be interrupted. */
+        private final AtomicBoolean interrupted = new AtomicBoolean();
+
+        /** Only one thread can handle receiver context. */
+        private final Lock lock = new ReentrantLock();
+
+        /** Last infinished downloading object. */
+        private TransmissionReceiver rcv;
+
+        /** Last saved state about file data processing. */
+        private TransmissionMeta lastState;
+
+        /** Close receiver timeout object. */
+        private GridTimeoutObject timeoutObj;
+
+        /**
+         * @param nodeId Remote node id.
+         * @param hnd Channel handler of current topic.
+         * @param sesId Unique session request id.
+         */
+        public ReceiverContext(UUID nodeId, TransmissionHandler hnd, IgniteUuid sesId) {
+            this.nodeId = nodeId;
+            this.hnd = hnd;
+            this.sesId = sesId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReceiverContext.class, this);
+        }
+    }
+
+    /**
+     * –°lass represents an implementation of transmission file writer. Each new instance
of transmission sender
+     * will establish a new connection with unique transmission session identifier to the
remote node and given
+     * topic (an arbitraty {@link GridTopic} can be used).
+     *
+     * <h2>Zero-copy approach</h2>
+     * <p>
+     * Current implementation of transmission sender is based on file zero-copy algorithm
(the {@link FileSender}
+     * is used under the hood). It is potentially much more efficient than a simple loop
that reads data from
+     * given file and writes it to the target socket channel. But if operating system does
not support zero-copy
+     * file transfer, sending a file with {@link TransmissionSender} might fail or yield
worse performance.
+     * <p>
+     * Please, refer to <a href="http://en.wikipedia.org/wiki/Zero-copy">http://en.wikipedia.org/wiki/Zero-copy</a>
+     * or {@link FileChannel#transferTo(long, long, WritableByteChannel)} for details of
such approach.
+     *
+     * <h2>File and Chunk handlers</h2>
+     * <p>
+     * It is possible to choose a file handler prior to sendig the file to remote node  within
opened transmission
+     * session. There are two types of handlers available:
+     * {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} and
+     * {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)}. You can use an appropriate
+     * {@link TransmissionPolicy} for {@link #send(File, long, long, Map, TransmissionPolicy)}
method to switch
+     * between them.
+     *
+     * <h2>Exceptions handling</h2>
+     * <p>
+     * The transmission can have two different levels of exception which are handled differently:
+     * <ul>
+     * <li><em>transport</em> exception(e.g. some network issues)</li>
+     * <li><em>application</em>\<em>handler</em> level exception</li>
+     * </ul>
+     *
+     * <h3><em>Application</em> exceptions</h3>
+     * <p>
+     * The transmission will be stopped immediately and wrapping <em>IgniteCheckedExcpetion</em>
thrown in case of
+     * any <em>application</em> exception occured.
+     *
+     * <h3><em>Transport</em> exceptions</h3>
+     * <p>
+     * All transport level exceptions of transmission file sender will require transmission
to be reconnected.
+     * For instance, when the local node closes the socket connection in orderly way, but
the file is not fully
+     * handled by remote node, the read operation over the same socket endpoint will return
<tt>-1</tt>. Such
+     * result will be consideread as an <em>IOException</em> by handler and it
will wait for reestablishing connection
+     * to continue file loading.
+     * <p>
+     * Another example, the transmission sender gets the <em>Connection reset by peer</em>
IOException message.
+     * This means that the remote node you are connected to has to reset the connection.
This is usually caused by a
+     * high amount of traffic on the host, but may be caused by a server error or the remote
node has exhausted
+     * system resources as well. Such <em>IOException</em> will be considered
as <em>reconnect required</em>.
+     *
+     * <h3>Timeout exceptions</h3>
+     * <p>
+     * For read operations over the {@link InputStream} or write operation through the {@link
OutputStream}
+     * the {@link Socket#setSoTimeout(int)} will be used and an {@link SocketTimeoutException}
will be
+     * thrown when the timeout occured. The default value is taken from {@link IgniteConfiguration#getNetworkTimeout()}.
+     *
+     * <h2>Release resources</h2>
+     * <p>
+     * It is important to call <em>close()</em> method or use <em>try-with-resource</em>
statement to release
+     * all resources once you've done with sending files.
+     *
+     * @see FileChannel#transferTo(long, long, WritableByteChannel)
+     */
+    public class TransmissionSender implements Closeable {
+        /** Remote node id to connect to. */
+        private final UUID rmtId;
+
+        /** Remote topic to connect to. */
+        private final Object topic;
+
+        /** Current unique session identifier to transfer files to remote node. */
+        private T2<UUID, IgniteUuid> sesKey;
+
+        /** Instance of opened writable channel to work with. */
+        private WritableByteChannel channel;
+
+        /** Decorated with data operations socket of output channel. */
+        private ObjectOutput out;
+
+        /** Decoreated with data operations socket of input channel. */
+        private ObjectInput in;
+
+        /**
+         * @param rmtId The remote note to connect to.
+         * @param topic The remote topic to connect to.
+         */
+        public TransmissionSender(
+            UUID rmtId,
+            Object topic
+        ) {
+            this.rmtId = rmtId;
+            this.topic = topic;
+            sesKey = new T2<>(rmtId, IgniteUuid.randomUuid());
+        }
+
+        /**
+         * @return The syncronization meta if case connection has been reset.
+         * @throws IgniteCheckedException If fails.
+         * @throws IOException If fails.
+         */
+        private TransmissionMeta connect() throws IgniteCheckedException, IOException {
+            SocketChannel channel = (SocketChannel)openChannel(rmtId,
+                topic,
+                new SessionChannelMessage(sesKey.get2()))
+                .get();
+
+            configureChannel(channel, netTimeoutMs);
+
+            this.channel = (WritableByteChannel)channel;
+            out = new ObjectOutputStream(channel.socket().getOutputStream());
+            in = new ObjectInputStream(channel.socket().getInputStream());
+
+            TransmissionMeta syncMeta;
+
+            try {
+                // Synchronize state between remote and local nodes.
+                syncMeta = (TransmissionMeta)in.readObject();
+            }
+            catch (ClassNotFoundException e) {
+                throw new IgniteException (e);
+            }
+
+            return syncMeta;
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param params Additional transfer file description keys.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            Map<String, Serializable> params,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            send(file, 0, file.length(), params, plc);
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            send(file, 0, file.length(), new HashMap<>(), plc);
+        }
+
+        /**
+         * @param file Source file to send to remote.
+         * @param offset Position to start trasfer at.
+         * @param cnt Number of bytes to transfer.
+         * @param params Additional transfer file description keys.
+         * @param plc The policy of handling data on remote.
+         * @throws IgniteCheckedException If fails.
+         */
+        public void send(
+            File file,
+            long offset,
+            long cnt,
+            Map<String, Serializable> params,
+            TransmissionPolicy plc
+        ) throws IgniteCheckedException, InterruptedException, IOException {
+            long startTime = U.currentTimeMillis();
+            int retries = 0;
+
+            senderStopFlags.putIfAbsent(sesKey, new AtomicBoolean());
+
+            try (FileSender snd = new FileSender(file,
+                offset,
+                cnt,
+                params,
+                plc,
+                () -> stopping || senderStopFlags.get(sesKey).get(),
+                log,
+                fileIoFactory,
+                DFLT_CHUNK_SIZE_BYTES)
+            ) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Start writing file to remote node [file=" + file.getName()
+
+                        ", rmtNodeId=" + rmtId + ", topic=" + topic + ']');
+                }
+
+                while (true) {
+                    if (Thread.currentThread().isInterrupted())
+                        throw new InterruptedException("The thread has been interrupted.
Stop uploading file.");
+
+                    if (stopping)
+                        throw new NodeStoppingException("Operation has been cancelled (node
is stopping)");
+
+                    try {
+                        TransmissionMeta rcvMeta = null;
+
+                        // in/out streams are not null if file has been sent successfully
+                        if (out == null && in == null) {
+                            rcvMeta = connect();
+
+                            assert rcvMeta != null : "Remote recevier has not sent its meta";
+
+                            // Stop in case of any error occurred on remote node during file
processing.
+                            if (rcvMeta.error() != null)
+                                throw rcvMeta.error();
+                        }
+
+                        snd.send(channel, out, rcvMeta);
+
+                        // Read file received acknowledge.
+                        boolean written = in.readBoolean();
+
+                        assert written : "File is not fully written :" + file.getAbsolutePath();
+
+                        break;
+                    }
+                    catch (IOException e) {
+                        closeChannelQuiet();
+
+                        retries++;
+
+                        if (retries >= retryCnt) {
+                            throw new IgniteException("The number of retry attempts to upload
file exceeded " +
+                                "the limit: " + retryCnt, e);
+                        }
+
+                        // Re-establish the new connection to continue upload.
+                        U.warn(log, "Connection lost while writing a file to remote node
and " +
+                            "will be reestablished [rmtId=" + rmtId + ", file=" + file.getName()
+
+                            ", sesKey=" + sesKey + ", retries=" + retries +
+                            ", transferred=" + snd.transferred() +
+                            ", total=" + snd.state().count() + ']', e);
+                    }
+                }
+
+                U.log(log, "File has been sent to remote node [name=" + file.getName() +
+                    ", uploadTime=" + (double)((U.currentTimeMillis() - startTime) / 1000)
+ " sec, retries=" + retries +
+                    ", transferred=" + snd.transferred() + ", rmtId=" + rmtId +']');
+
+            }
+            catch (InterruptedException e) {
+                closeChannelQuiet();
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                closeChannelQuiet();
+
+                throw new IgniteCheckedException("Excpetion while sending file [rmtId=" +
rmtId +
 
 Review comment:
   "Exception while ..."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message